holdenk commented on a change in pull request #29959:
URL: https://github.com/apache/spark/pull/29959#discussion_r501927461
##########
File path: core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
##########
@@ -57,11 +50,22 @@ private[spark] object HadoopFSUtils extends Logging {
* @param parallelismMax The maximum parallelism for listing. If the number
of input paths is
* larger than this value, parallelism will be
throttled to this value
* to avoid generating too many tasks.
- * @param filterFun Optional predicate on the leaf files. Files who failed
the check will be
- * excluded from the results
* @return for each input path, the set of discovered files for the path
*/
def parallelListLeafFiles(
+ sc: SparkContext,
+ paths: Seq[Path],
+ hadoopConf: Configuration,
+ filter: PathFilter,
+ ignoreMissingFiles: Boolean,
+ ignoreLocality: Boolean,
+ parallelismThreshold: Int,
+ parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
+ parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, true,
ignoreMissingFiles,
Review comment:
nit: For readability I'd passed this as a named parameter since a bare
boolean isn't very clear.
##########
File path: core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
##########
@@ -207,18 +166,14 @@ private[spark] object HadoopFSUtils extends Logging {
// Note that statuses only include FileStatus for the files and dirs
directly under path,
// and does not include anything else recursively.
val statuses: Array[FileStatus] = try {
- fs match {
- // DistributedFileSystem overrides listLocatedStatus to make 1 single
call to namenode
- // to retrieve the file status with the file block location. The
reason to still fallback
- // to listStatus is because the default implementation would
potentially throw a
- // FileNotFoundException which is better handled by doing the lookups
manually below.
- case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality
=>
- val remoteIter = fs.listLocatedStatus(path)
- new Iterator[LocatedFileStatus]() {
- def next(): LocatedFileStatus = remoteIter.next
- def hasNext(): Boolean = remoteIter.hasNext
- }.toArray
- case _ => fs.listStatus(path)
+ if (ignoreLocality) {
+ fs.listStatus(path)
+ } else {
+ val remoteIter = fs.listLocatedStatus(path)
Review comment:
Is there a chance a FS won't have this implemented? as per the previous
code's comment.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
##########
@@ -214,9 +214,9 @@ class FileIndexSuite extends SharedSparkSession {
assert(leafFiles.isEmpty)
} else {
assert(raceCondition == classOf[FileDeletionRaceFileSystem])
- // One of the two leaf files was missing, but we should still
list the other:
- assert(leafFiles.size == 1)
- assert(leafFiles.head.getPath == nonDeletedLeafFilePath)
+ // listLocatedStatus will fail as a whole because the default
impl calls
+ // getFileBlockLocations
+ assert(leafFiles.isEmpty)
Review comment:
This seems to indicate the change needs some work.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]