juliuszsompolski opened a new pull request, #38397:
URL: https://github.com/apache/spark/pull/38397
### What changes were proposed in this pull request?
We move the decision about supporting columnar output based on WSCG one
level from ParquetFileFormat up to FileSourceScanExec, and pass it as a new
required option for ParquetFileFormat. Now the semantics is as follows:
* `ParquetFileFormat.supportsBatch` returns whether it **can**, not
necessarily **will* return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH`
needs to be passed to `buildReaderWithPartitionValues`. It should only be set
to `true` if `ParquetFileFormat.supportsBatch` is also `true`, but it can be
set to `false` if we don't want columnar output nevertheless - this way,
`FileSourceScanExec` can set it to false when there are more than 100 columsn
for WSCG, and `ParquetFileFormat` doesn't have to concern itself about WSCG
limits.
* To avoid not passing it by accident, this option is made required. Making
it required requires updating a few places that use it, but an error resulting
from this is very obscure. It's better to fail early and explicitly here.
### Why are the changes needed?
`java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch
cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown
because ParquetReader was outputting columnar batches, while FileSourceScanExec
expected row output.
The mismatch comes from the fact that `ParquetFileFormat.supportBatch`
depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the
threshold is 100 fields.
When this is used in `FileSourceScanExec`:
```
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
}
```
the `schema` comes from output attributes, which includes extra metadata
attributes.
However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was
calculated again as
```
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = options,
hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```
Where `requiredSchema` and `partitionSchema` wouldn't include the metadata
columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L,
file_path#6388)
FileSourceScanExec: dataSchema:
StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema:
StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```
Column like `file_path#6388` are added by the scan, and contain metadata
added by the scan, not by the file reader which concerns itself with what is
within the file.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tests added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]