wangyum opened a new pull request #24679: [SPARK-27807][SQL] Parallel resolve leaf statuses InMemoryFileIndex URL: https://github.com/apache/spark/pull/24679 ## What changes were proposed in this pull request? This pr refers to [`ParquetFileFormat.readParquetFootersInParallel`](https://github.com/apache/spark/blob/215609def22da14c464b37374ceae4f53a39a145/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L539-L555) to parallel resolve leaf statuses in `InMemoryFileIndex`. ## How was this patch tested? manual tests. Change [`InMemoryFileIndex.listLeafFiles`](https://github.com/apache/spark/blob/46f9f44918ba2589c6ffc938822cbdfac1c6f4d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L266-L345) to: ```scala private def listLeafFiles( path: Path, hadoopConf: Configuration, filter: PathFilter, sessionOpt: Option[SparkSession]): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses = try fs.listStatus(path) catch { case _: FileNotFoundException => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = sessionOpt match { case Some(session) => bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) case _ => dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) } val allFiles = topLevelFiles ++ nestedFiles if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } val missingFiles = mutable.ArrayBuffer.empty[String] val filteredLeafStatuses = allLeafStatuses.filterNot( status => shouldFilterOut(status.getPath.getName)) def resolveLeafStatuses(fileStatus: FileStatus): Option[LocatedFileStatus] = fileStatus match { case f: LocatedFileStatus => Some(f) // NOTE: // // - Although S3/S3A/S3N file system can be quite slow for remote file metadata // operations, calling `getFileBlockLocations` does no harm here since these file system // implementations don't actually issue RPC for this method. // // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not // be a big deal since we always use to `bulkListLeafFiles` when the number of // paths exceeds threshold. case f => // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). try { val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => // Store BlockLocation objects to consume less memory if (loc.getClass == classOf[BlockLocation]) { loc } else { new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) } } val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, 0, null, null, null, null, f.getPath, locations) if (f.isSymlink) { lfs.setSymlink(f.getSymlink) } Some(lfs) } catch { case _: FileNotFoundException => missingFiles += f.getPath.toString None } } val start1 = System.currentTimeMillis() val parResolvedLeafStatuses = ThreadUtils.parmap( filteredLeafStatuses.toSeq, "resolveLeafStatuses", 8)(resolveLeafStatuses).flatten val start2 = System.currentTimeMillis() val defResolvedLeafStatuses = filteredLeafStatuses.flatMap(resolveLeafStatuses) val end = System.currentTimeMillis() logWarning(s"Elements: ${parResolvedLeafStatuses.size}, " + s"parallel time token: ${start2 - start1}, default time token: ${end - start2}.") if (missingFiles.nonEmpty) { logWarning( s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") } parResolvedLeafStatuses } ``` The first time: ``` 19/05/22 08:09:55 WARN InMemoryFileIndex: Elements: 10, parallel time token: 66, default time token: 123. 19/05/22 08:10:08 WARN InMemoryFileIndex: Elements: 20, parallel time token: 27, default time token: 202. 19/05/22 08:10:10 WARN InMemoryFileIndex: Elements: 50, parallel time token: 40, default time token: 371. 19/05/22 08:10:13 WARN InMemoryFileIndex: Elements: 100, parallel time token: 83, default time token: 664. 19/05/22 08:10:18 WARN InMemoryFileIndex: Elements: 200, parallel time token: 162, default time token: 1174. 19/05/22 08:10:34 WARN InMemoryFileIndex: Elements: 500, parallel time token: 3797, default time token: 7533. 19/05/22 08:10:43 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 88, default time token: 357. 19/05/22 08:11:24 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 3755, default time token: 14304. 19/05/22 08:13:03 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 47553, default time token: 30628. 19/05/22 08:20:31 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 20475, default time token: 23823. 19/05/22 08:22:01 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 1561, default time token: 9339. 19/05/22 08:23:02 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 2829, default time token: 20497. 19/05/22 08:24:44 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 5941, default time token: 51388. 19/05/22 08:33:55 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 51993, default time token: 406825. ``` The second time: ``` 19/05/22 08:47:17 WARN InMemoryFileIndex: Elements: 10, parallel time token: 34, default time token: 10. 19/05/22 08:49:59 WARN InMemoryFileIndex: Elements: 20, parallel time token: 8, default time token: 22. 19/05/22 08:50:03 WARN InMemoryFileIndex: Elements: 50, parallel time token: 14, default time token: 28. 19/05/22 08:50:23 WARN InMemoryFileIndex: Elements: 100, parallel time token: 20, default time token: 73. 19/05/22 08:50:32 WARN InMemoryFileIndex: Elements: 200, parallel time token: 43, default time token: 103. 19/05/22 08:50:47 WARN InMemoryFileIndex: Elements: 500, parallel time token: 166, default time token: 271. 19/05/22 08:52:08 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 113, default time token: 385. 19/05/22 08:53:44 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 257, default time token: 971. 19/05/22 08:55:01 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 753, default time token: 2197. 19/05/22 08:57:04 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 1116, default time token: 4307. 19/05/22 09:00:39 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 3160, default time token: 8983. 19/05/22 09:14:24 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 77537, default time token: 397752. 19/05/22 09:19:05 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 10355, default time token: 91161. 19/05/22 09:30:05 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 71769, default time token: 375620. ``` The third time: ``` 19/05/22 09:13:21 WARN InMemoryFileIndex: Elements: 10, parallel time token: 62, default time token: 83. 19/05/22 09:13:35 WARN InMemoryFileIndex: Elements: 20, parallel time token: 53, default time token: 299. 19/05/22 09:13:39 WARN InMemoryFileIndex: Elements: 50, parallel time token: 71, default time token: 347. 19/05/22 09:14:31 WARN InMemoryFileIndex: Elements: 100, parallel time token: 27, default time token: 2766. 19/05/22 09:14:41 WARN InMemoryFileIndex: Elements: 200, parallel time token: 61, default time token: 235. 19/05/22 09:15:22 WARN InMemoryFileIndex: Elements: 500, parallel time token: 65, default time token: 239. 19/05/22 09:17:20 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 882, default time token: 428. 19/05/22 09:18:19 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 919, default time token: 6229. 19/05/22 09:20:11 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 557, default time token: 9557. 19/05/22 09:22:27 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 838, default time token: 10047. 19/05/22 09:24:49 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 2294, default time token: 9914. 19/05/22 09:28:18 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 2955, default time token: 22586. 19/05/22 09:30:30 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 7447, default time token: 50589. 19/05/22 09:41:03 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 71536, default time token: 406319. ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
