Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/21868#discussion_r210799950
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
---
@@ -425,12 +426,44 @@ case class FileSourceScanExec(
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
- val openCostInBytes =
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+ var openCostInBytes =
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism =
fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen +
openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
- val maxSplitBytes = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
+ var maxSplitBytes = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
+
+
if(fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled &&
+ (fsRelation.fileFormat.isInstanceOf[ParquetSource] ||
+ fsRelation.fileFormat.isInstanceOf[OrcFileFormat])) {
+ if (relation.dataSchema.map(_.dataType).forall(dataType =>
+ dataType.isInstanceOf[CalendarIntervalType] ||
dataType.isInstanceOf[StructType]
+ || dataType.isInstanceOf[MapType] ||
dataType.isInstanceOf[NullType]
+ || dataType.isInstanceOf[AtomicType] ||
dataType.isInstanceOf[ArrayType])) {
+
+ def getTypeLength(dataType: DataType): Int = {
+ if (dataType.isInstanceOf[StructType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+ } else if (dataType.isInstanceOf[ArrayType]) {
+
fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+ } else if (dataType.isInstanceOf[MapType]) {
+ fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+ } else {
+ dataType.defaultSize
+ }
+ }
+
+ val selectedColumnSize =
requiredSchema.map(_.dataType).map(getTypeLength(_))
+ .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+ val totalColumnSize =
relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+ .reduceOption(_ + _).getOrElse(StringType.defaultSize)
--- End diff --
I think his point is that the estimation is super rough which I agree with
.. I am less sure if we should go ahead or not partially by this reason as well.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]