wangyum opened a new pull request #24679: [SPARK-27807][SQL] Parallel resolve 
leaf statuses InMemoryFileIndex
URL: https://github.com/apache/spark/pull/24679
 
 
   ## What changes were proposed in this pull request?
   
   This pr refers to 
[`ParquetFileFormat.readParquetFootersInParallel`](https://github.com/apache/spark/blob/215609def22da14c464b37374ceae4f53a39a145/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L539-L555)
 to parallel resolve leaf statuses in `InMemoryFileIndex`.
   
   
   ## How was this patch tested?
   
   manual tests. Change 
[`InMemoryFileIndex.listLeafFiles`](https://github.com/apache/spark/blob/46f9f44918ba2589c6ffc938822cbdfac1c6f4d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L266-L345)
 to:
   ```scala
      private def listLeafFiles(
         path: Path,
         hadoopConf: Configuration,
         filter: PathFilter,
         sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
       logTrace(s"Listing $path")
       val fs = path.getFileSystem(hadoopConf)
   
       // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't 
exist
       // Note that statuses only include FileStatus for the files and dirs 
directly under path,
       // and does not include anything else recursively.
       val statuses = try fs.listStatus(path) catch {
         case _: FileNotFoundException =>
           logWarning(s"The directory $path was not found. Was it deleted very 
recently?")
           Array.empty[FileStatus]
       }
   
       val filteredStatuses = statuses.filterNot(status => 
shouldFilterOut(status.getPath.getName))
   
       val allLeafStatuses = {
         val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
         val nestedFiles: Seq[FileStatus] = sessionOpt match {
           case Some(session) =>
             bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, 
session).flatMap(_._2)
           case _ =>
             dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, 
sessionOpt))
         }
         val allFiles = topLevelFiles ++ nestedFiles
         if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) 
else allFiles
       }
   
       val missingFiles = mutable.ArrayBuffer.empty[String]
       val filteredLeafStatuses = allLeafStatuses.filterNot(
         status => shouldFilterOut(status.getPath.getName))
   
       def resolveLeafStatuses(fileStatus: FileStatus): 
Option[LocatedFileStatus] = fileStatus match {
         case f: LocatedFileStatus =>
           Some(f)
   
         // NOTE:
         //
         // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
         //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
         //   implementations don't actually issue RPC for this method.
         //
         // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
         //   be a big deal since we always use to `bulkListLeafFiles` when the 
number of
         //   paths exceeds threshold.
         case f =>
           // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
           // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
           // subprocess and parse the stdout).
           try {
             val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc 
=>
               // Store BlockLocation objects to consume less memory
               if (loc.getClass == classOf[BlockLocation]) {
                 loc
               } else {
                 new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, 
loc.getLength)
               }
             }
             val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
               f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
             if (f.isSymlink) {
               lfs.setSymlink(f.getSymlink)
             }
             Some(lfs)
           } catch {
             case _: FileNotFoundException =>
               missingFiles += f.getPath.toString
               None
           }
       }
   
       val start1 = System.currentTimeMillis()
       val parResolvedLeafStatuses = ThreadUtils.parmap(
         filteredLeafStatuses.toSeq, "resolveLeafStatuses", 
8)(resolveLeafStatuses).flatten
   
       val start2 = System.currentTimeMillis()
       val defResolvedLeafStatuses = 
filteredLeafStatuses.flatMap(resolveLeafStatuses)
   
       val end = System.currentTimeMillis()
       logWarning(s"Elements: ${parResolvedLeafStatuses.size}, " +
         s"parallel time token: ${start2 - start1}, default time token: ${end - 
start2}.")
   
       if (missingFiles.nonEmpty) {
         logWarning(
           s"the following files were missing during file scan:\n  
${missingFiles.mkString("\n  ")}")
       }
   
       parResolvedLeafStatuses
     }
   ```
   
   The first time:
   ```
   19/05/22 08:09:55 WARN InMemoryFileIndex: Elements: 10, parallel time token: 
66, default time token: 123.
   19/05/22 08:10:08 WARN InMemoryFileIndex: Elements: 20, parallel time token: 
27, default time token: 202.
   19/05/22 08:10:10 WARN InMemoryFileIndex: Elements: 50, parallel time token: 
40, default time token: 371.
   19/05/22 08:10:13 WARN InMemoryFileIndex: Elements: 100, parallel time 
token: 83, default time token: 664.
   19/05/22 08:10:18 WARN InMemoryFileIndex: Elements: 200, parallel time 
token: 162, default time token: 1174.
   19/05/22 08:10:34 WARN InMemoryFileIndex: Elements: 500, parallel time 
token: 3797, default time token: 7533.
   19/05/22 08:10:43 WARN InMemoryFileIndex: Elements: 1000, parallel time 
token: 88, default time token: 357.
   19/05/22 08:11:24 WARN InMemoryFileIndex: Elements: 2000, parallel time 
token: 3755, default time token: 14304.
   19/05/22 08:13:03 WARN InMemoryFileIndex: Elements: 5000, parallel time 
token: 47553, default time token: 30628.
   19/05/22 08:20:31 WARN InMemoryFileIndex: Elements: 10000, parallel time 
token: 20475, default time token: 23823.
   19/05/22 08:22:01 WARN InMemoryFileIndex: Elements: 20000, parallel time 
token: 1561, default time token: 9339.
   19/05/22 08:23:02 WARN InMemoryFileIndex: Elements: 40000, parallel time 
token: 2829, default time token: 20497.
   19/05/22 08:24:44 WARN InMemoryFileIndex: Elements: 80000, parallel time 
token: 5941, default time token: 51388.
   19/05/22 08:33:55 WARN InMemoryFileIndex: Elements: 692375, parallel time 
token: 51993, default time token: 406825.
   ```
   
   The second time:
   ```
   19/05/22 08:47:17 WARN InMemoryFileIndex: Elements: 10, parallel time token: 
34, default time token: 10.
   19/05/22 08:49:59 WARN InMemoryFileIndex: Elements: 20, parallel time token: 
8, default time token: 22.
   19/05/22 08:50:03 WARN InMemoryFileIndex: Elements: 50, parallel time token: 
14, default time token: 28.
   19/05/22 08:50:23 WARN InMemoryFileIndex: Elements: 100, parallel time 
token: 20, default time token: 73.
   19/05/22 08:50:32 WARN InMemoryFileIndex: Elements: 200, parallel time 
token: 43, default time token: 103.
   19/05/22 08:50:47 WARN InMemoryFileIndex: Elements: 500, parallel time 
token: 166, default time token: 271.
   19/05/22 08:52:08 WARN InMemoryFileIndex: Elements: 1000, parallel time 
token: 113, default time token: 385.
   19/05/22 08:53:44 WARN InMemoryFileIndex: Elements: 2000, parallel time 
token: 257, default time token: 971.
   19/05/22 08:55:01 WARN InMemoryFileIndex: Elements: 5000, parallel time 
token: 753, default time token: 2197.
   19/05/22 08:57:04 WARN InMemoryFileIndex: Elements: 10000, parallel time 
token: 1116, default time token: 4307.
   19/05/22 09:00:39 WARN InMemoryFileIndex: Elements: 20000, parallel time 
token: 3160, default time token: 8983.
   19/05/22 09:14:24 WARN InMemoryFileIndex: Elements: 40000, parallel time 
token: 77537, default time token: 397752.
   19/05/22 09:19:05 WARN InMemoryFileIndex: Elements: 80000, parallel time 
token: 10355, default time token: 91161.
   19/05/22 09:30:05 WARN InMemoryFileIndex: Elements: 692375, parallel time 
token: 71769, default time token: 375620.
   ```
   
   The third time:
   ```
   19/05/22 09:13:21 WARN InMemoryFileIndex: Elements: 10, parallel time token: 
62, default time token: 83.
   19/05/22 09:13:35 WARN InMemoryFileIndex: Elements: 20, parallel time token: 
53, default time token: 299.
   19/05/22 09:13:39 WARN InMemoryFileIndex: Elements: 50, parallel time token: 
71, default time token: 347.
   19/05/22 09:14:31 WARN InMemoryFileIndex: Elements: 100, parallel time 
token: 27, default time token: 2766.
   19/05/22 09:14:41 WARN InMemoryFileIndex: Elements: 200, parallel time 
token: 61, default time token: 235.
   19/05/22 09:15:22 WARN InMemoryFileIndex: Elements: 500, parallel time 
token: 65, default time token: 239.
   19/05/22 09:17:20 WARN InMemoryFileIndex: Elements: 1000, parallel time 
token: 882, default time token: 428.
   19/05/22 09:18:19 WARN InMemoryFileIndex: Elements: 2000, parallel time 
token: 919, default time token: 6229.
   19/05/22 09:20:11 WARN InMemoryFileIndex: Elements: 5000, parallel time 
token: 557, default time token: 9557.
   19/05/22 09:22:27 WARN InMemoryFileIndex: Elements: 10000, parallel time 
token: 838, default time token: 10047.
   19/05/22 09:24:49 WARN InMemoryFileIndex: Elements: 20000, parallel time 
token: 2294, default time token: 9914.
   19/05/22 09:28:18 WARN InMemoryFileIndex: Elements: 40000, parallel time 
token: 2955, default time token: 22586.
   19/05/22 09:30:30 WARN InMemoryFileIndex: Elements: 80000, parallel time 
token: 7447, default time token: 50589.
   19/05/22 09:41:03 WARN InMemoryFileIndex: Elements: 692375, parallel time 
token: 71536, default time token: 406319.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to