bersprockets commented on a change in pull request #34659:
URL: https://github.com/apache/spark/pull/34659#discussion_r771851647
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -169,8 +170,8 @@ class ParquetFileFormat
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
- schema.length <= conf.wholeStageMaxNumFields &&
- schema.forall(_.dataType.isInstanceOf[AtomicType])
+ ParquetFileFormat.isBatchReadSupported(conf, schema) &&
Review comment:
When ```spark.sql.parquet.enableNestedColumnVectorizedReader=false```,
which will be the normal case initially, I think returningBatch will always be
false, even for non-complex cases, since this
[line](https://github.com/apache/spark/blob/7b6e083833c807e44b76bc321d32e9c5857e648f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L249)
always passes a struct (resultSchema) to ```supportBatch```.
At least, that's what my testing seems to show. With this PR and
```spark.sql.parquet.enableNestedColumnVectorizedReader=false```,
returningBatch is false for a non-complex case. But on master, returningBatch
is true for the same test:
```
spark.range(0, 10).map { x => (x, x + 1, x + 3) }.toDF("a", "b", "c").
write.mode("overwrite").format("parquet").save("simple_parquet")
sql("select a, b from `parquet`.`simple_parquet`").collect
```
and...
```
bash-3.2$ git diff | cat
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index bd8d11d827..96182aafa9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -340,6 +340,7 @@ class ParquetFileFormat
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ print(s"returningBatch is $returningBatch\n")
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
bash-3.2$
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]