Github user vgankidi commented on a diff in the pull request:
https://github.com/apache/spark/pull/19633#discussion_r150431620
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
---
@@ -424,11 +424,19 @@ case class FileSourceScanExec(
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes =
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
- val defaultParallelism =
fsRelation.sparkSession.sparkContext.defaultParallelism
- val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen +
openCostInBytes)).sum
- val bytesPerCore = totalBytes / defaultParallelism
- val maxSplitBytes = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
+ // Ignore bytesPerCore when dynamic allocation is enabled. See
SPARK-22411
+ val maxSplitBytes =
+ if
(Utils.isDynamicAllocationEnabled(fsRelation.sparkSession.sparkContext.getConf))
{
+ defaultMaxSplitBytes
--- End diff --
That is true. In that case, this change wouldn't help. That is why I asked
if using `spark.dynamicAllocation.maxExecutors` for calculating bytesPerCore is
better when dynamic allocation is enabled.
For small tables (when `bytesPerCore` is less than `defaultMaxSplitBytes`)
we have had use cases where we wanted control over the partition size without
being limited by `bytesPerCore`. We internally added a flag to ignore
`bytesPerCore`.
Reasoning about split size in terms of the two parameters
'defaultMaxSplitBytes' and 'openCostInBytes' seems to be more intuitive. What
do you think?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]