LantaoJin commented on a change in pull request #24527: [SPARK-27635][SQL] 
Prevent from splitting too many partitions smaller than row group size in 
Parquet file format
URL: https://github.com/apache/spark/pull/24527#discussion_r281052871
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 ##########
 @@ -420,8 +420,12 @@ case class FileSourceScanExec(
       selectedPartitions: Seq[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
     val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
-    val maxSplitBytes =
-      FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+    val maxSplitBytes = relation.fileFormat match {
+      case _ : ParquetSource =>
+        fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes // 
parquet.block.size
+      case _ =>
+        FilePartition.maxSplitBytes(fsRelation.sparkSession, 
selectedPartitions)
+    }
 
 Review comment:
   It may be hard to provide a UT. This case only happens in one of our jobs 
which we enable multi-thread to read from one HDFS folder and write to 
different target HDFS folders with different filters. With DRA enabled and the 
job launched many and many executors with near 8000 active tasks. When the job 
runs for a while, the task number of filter/scan stages increases from 200 to 
over 5000. And we got many below logs:
   
   > 19/04/29 06:13:48 INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:13:48 INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:13:49 INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:15:49 INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 4474908 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:16:15 INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:16:21 INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:16:23 INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:16:23 INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
   
   This issue would gone in four cases:
   
   1. set "spark.default.parallelism" to a fixed value.
   2. Disable DRA and set num-executors to a low value.
   3. The app can not get too many resources to launch executors
   4. Run jobs one by one instead multi-thread to run. 
   
   All of above will prevent app to require too many partitions since less 
cores:
   ```scala
     override def defaultParallelism(): Int = { //  if not set, more resources, 
more cores
       conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 
2))
     }
   
     def maxSplitBytes(
         sparkSession: SparkSession,
         selectedPartitions: Seq[PartitionDirectory]): Long = {
       val defaultMaxSplitBytes = 
sparkSession.sessionState.conf.filesMaxPartitionBytes
       val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
       val defaultParallelism = sparkSession.sparkContext.defaultParallelism
       val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
       val bytesPerCore = totalBytes / defaultParallelism // more cores, less 
bytesPerCore
       // less bytesPerCore, less maxSplitBytes
       Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
     }
   
     def splitFiles(
         sparkSession: SparkSession,
         file: FileStatus,
         filePath: Path,
         isSplitable: Boolean,
         maxSplitBytes: Long,
         partitionValues: InternalRow): Seq[PartitionedFile] = {
       if (isSplitable) {
         (0L until file.getLen by maxSplitBytes).map { offset => // less 
maxSplitBytes, more partitions
           val remaining = file.getLen - offset
           val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
           val hosts = getBlockHosts(getBlockLocations(file), offset, size)
           PartitionedFile(partitionValues, filePath.toUri.toString, offset, 
size, hosts)
         }
       } else {
         Seq(getPartitionedFile(file, filePath, partitionValues))
       }
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to