holdenk commented on a change in pull request #29959:
URL: https://github.com/apache/spark/pull/29959#discussion_r501927461



##########
File path: core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
##########
@@ -57,11 +50,22 @@ private[spark] object HadoopFSUtils extends Logging {
    * @param parallelismMax The maximum parallelism for listing. If the number 
of input paths is
    *                       larger than this value, parallelism will be 
throttled to this value
    *                       to avoid generating too many tasks.
-   * @param filterFun Optional predicate on the leaf files. Files who failed 
the check will be
-   *                  excluded from the results
    * @return for each input path, the set of discovered files for the path
    */
   def parallelListLeafFiles(
+    sc: SparkContext,
+    paths: Seq[Path],
+    hadoopConf: Configuration,
+    filter: PathFilter,
+    ignoreMissingFiles: Boolean,
+    ignoreLocality: Boolean,
+    parallelismThreshold: Int,
+    parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
+    parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, true, 
ignoreMissingFiles,

Review comment:
       nit: For readability I'd passed this as a named parameter since a bare 
boolean isn't very clear.

##########
File path: core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
##########
@@ -207,18 +166,14 @@ private[spark] object HadoopFSUtils extends Logging {
     // Note that statuses only include FileStatus for the files and dirs 
directly under path,
     // and does not include anything else recursively.
     val statuses: Array[FileStatus] = try {
-      fs match {
-        // DistributedFileSystem overrides listLocatedStatus to make 1 single 
call to namenode
-        // to retrieve the file status with the file block location. The 
reason to still fallback
-        // to listStatus is because the default implementation would 
potentially throw a
-        // FileNotFoundException which is better handled by doing the lookups 
manually below.
-        case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality 
=>
-          val remoteIter = fs.listLocatedStatus(path)
-          new Iterator[LocatedFileStatus]() {
-            def next(): LocatedFileStatus = remoteIter.next
-            def hasNext(): Boolean = remoteIter.hasNext
-          }.toArray
-        case _ => fs.listStatus(path)
+      if (ignoreLocality) {
+        fs.listStatus(path)
+      } else {
+        val remoteIter = fs.listLocatedStatus(path)

Review comment:
       Is there a chance a FS won't have this implemented? as per the previous 
code's comment.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
##########
@@ -214,9 +214,9 @@ class FileIndexSuite extends SharedSparkSession {
               assert(leafFiles.isEmpty)
             } else {
               assert(raceCondition == classOf[FileDeletionRaceFileSystem])
-              // One of the two leaf files was missing, but we should still 
list the other:
-              assert(leafFiles.size == 1)
-              assert(leafFiles.head.getPath == nonDeletedLeafFilePath)
+              // listLocatedStatus will fail as a whole because the default 
impl calls
+              // getFileBlockLocations
+              assert(leafFiles.isEmpty)

Review comment:
       This seems to indicate the change needs some work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to