Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20933#discussion_r179515128
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
    @@ -396,107 +395,34 @@ case class FileSourceScanExec(
           readFile: (PartitionedFile) => Iterator[InternalRow],
           selectedPartitions: Seq[PartitionDirectory],
           fsRelation: HadoopFsRelation): RDD[InternalRow] = {
    -    val defaultMaxSplitBytes =
    -      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    +    val maxSplitBytes = 
PartitionedFileUtil.maxSplitBytes(relation.sparkSession, selectedPartitions)
         val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    -    val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
    -    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
    -    val bytesPerCore = totalBytes / defaultParallelism
    -
    -    val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
         logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes 
bytes, " +
           s"open cost is considered as scanning $openCostInBytes bytes.")
     
         val splitFiles = selectedPartitions.flatMap { partition =>
           partition.files.flatMap { file =>
    -        val blockLocations = getBlockLocations(file)
    -        if (fsRelation.fileFormat.isSplitable(
    -            fsRelation.sparkSession, fsRelation.options, file.getPath)) {
    -          (0L until file.getLen by maxSplitBytes).map { offset =>
    -            val remaining = file.getLen - offset
    -            val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
    -            val hosts = getBlockHosts(blockLocations, offset, size)
    -            PartitionedFile(
    -              partition.values, file.getPath.toUri.toString, offset, size, 
hosts)
    -          }
    -        } else {
    -          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
    -          Seq(PartitionedFile(
    -            partition.values, file.getPath.toUri.toString, 0, file.getLen, 
hosts))
    -        }
    +        // getPath() is very expensive so we only want to call it once in 
this block:
    --- End diff --
    
    this can be taken care of in `PartitionedFileUtil.splitFiles`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to