juliuszsompolski opened a new pull request, #38397:
URL: https://github.com/apache/spark/pull/38397

   ### What changes were proposed in this pull request?
   
   We move the decision about supporting columnar output based on WSCG one 
level from ParquetFileFormat up to FileSourceScanExec, and pass it as a new 
required option for ParquetFileFormat. Now the semantics is as follows:
   * `ParquetFileFormat.supportsBatch` returns whether it **can**, not 
necessarily **will* return columnar output.
   * To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` 
needs to be passed to `buildReaderWithPartitionValues`. It should only be set 
to `true` if `ParquetFileFormat.supportsBatch` is also `true`, but it can be 
set to `false` if we don't want columnar output nevertheless - this way, 
`FileSourceScanExec` can set it to false when there are more than 100 columsn 
for WSCG, and `ParquetFileFormat` doesn't have to concern itself about WSCG 
limits.
   * To avoid not passing it by accident, this option is made required. Making 
it required requires updating a few places that use it, but an error resulting 
from this is very obscure. It's better to fail early and explicitly here.
   
   ### Why are the changes needed?
   
   `java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch 
cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown 
because ParquetReader was outputting columnar batches, while FileSourceScanExec 
expected row output.
   
   The mismatch comes from the fact that `ParquetFileFormat.supportBatch` 
depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the 
threshold is 100 fields.
   
   When this is used in `FileSourceScanExec`:
   ```
     override lazy val supportsColumnar: Boolean = {
         relation.fileFormat.supportBatch(relation.sparkSession, schema)
     }
   ```
   the `schema` comes from output attributes, which includes extra metadata 
attributes.
   
   However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was 
calculated again as
   ```
         relation.fileFormat.buildReaderWithPartitionValues(
           sparkSession = relation.sparkSession,
           dataSchema = relation.dataSchema,
           partitionSchema = relation.partitionSchema,
           requiredSchema = requiredSchema,
           filters = pushedDownFilters,
           options = options,
           hadoopConf = hadoopConf
   ...
   val resultSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
   ...
   val returningBatch = supportBatch(sparkSession, resultSchema)
   ```
   
   Where `requiredSchema` and `partitionSchema` wouldn't include the metadata 
columns:
   ```
   FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, 
file_path#6388)
   FileSourceScanExec: dataSchema: 
StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
   FileSourceScanExec: partitionSchema: StructType()
   FileSourceScanExec: requiredSchema: 
StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
   ```
   
   Column like `file_path#6388` are added by the scan, and contain metadata 
added by the scan, not by the file reader which concerns itself with what is 
within the file.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Tests added


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to