LantaoJin commented on a change in pull request #24527: [SPARK-27635][SQL]
Prevent from splitting too many partitions smaller than row group size in
Parquet file format
URL: https://github.com/apache/spark/pull/24527#discussion_r281052871
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -420,8 +420,12 @@ case class FileSourceScanExec(
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val openCostInBytes =
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
- val maxSplitBytes =
- FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+ val maxSplitBytes = relation.fileFormat match {
+ case _ : ParquetSource =>
+ fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes //
parquet.block.size
+ case _ =>
+ FilePartition.maxSplitBytes(fsRelation.sparkSession,
selectedPartitions)
+ }
Review comment:
It may be hard to provide a UT. This case only happens in one of our jobs
which we enable multi-thread to read from one HDFS folder and write to
different target HDFS folders with different filters. With DRA enabled and the
job launched many and many executors with near 8000 active tasks. When the job
runs for a while, the task number of filter/scan stages increases from 200 to
over 5000. And we got many below logs:
> 19/04/29 06:13:48 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:13:48 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:13:49 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:15:49 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 4474908 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:16:15 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:16:21 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:16:23 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:16:23 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
This issue would gone in four cases:
1. set "spark.default.parallelism" to a fixed value.
2. Disable DRA and set num-executors to a low value.
3. The app can not get too many resources to launch executors
4. Run jobs one by one instead multi-thread to run.
All of above will prevent app to require too many partitions since less
cores:
```scala
override def defaultParallelism(): Int = { // if not set, more resources,
more cores
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))
}
def maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long = {
val defaultMaxSplitBytes =
sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen +
openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism // more cores, less
bytesPerCore
// less bytesPerCore, less maxSplitBytes
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
if (isSplitable) {
(0L until file.getLen by maxSplitBytes).map { offset => // less
maxSplitBytes, more partitions
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else
remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
PartitionedFile(partitionValues, filePath.toUri.toString, offset,
size, hosts)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues))
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]