sunchao commented on a change in pull request #33888:
URL: https://github.com/apache/spark/pull/33888#discussion_r702020262
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -323,15 +344,27 @@ object ParquetReadSupport {
* @return A list of clipped [[GroupType]] fields, which can be empty.
*/
private def clipParquetGroupFields(
- parquetRecord: GroupType, structType: StructType, caseSensitive:
Boolean): Seq[Type] = {
+ parquetRecord: GroupType,
+ structType: StructType,
+ caseSensitive: Boolean,
+ accessByColumnOrdinal: Boolean): Seq[Type] = {
val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat
= false)
- if (caseSensitive) {
+ if (accessByColumnOrdinal) {
+ structType.zipWithIndex.map { case (f, i) =>
+ if (i < parquetRecord.getFieldCount) {
+ clipParquetType(
+ parquetRecord.getType(i), f.dataType, caseSensitive,
accessByColumnOrdinal)
+ } else {
+ toParquet.convertField(f)
+ }
+ }
+ } else if (caseSensitive) {
val caseSensitiveParquetFieldMap =
parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
structType.map { f =>
caseSensitiveParquetFieldMap
.get(f.name)
- .map(clipParquetType(_, f.dataType, caseSensitive))
+ .map(clipParquetType(_, f.dataType, caseSensitive,
accessByColumnOrdinal))
Review comment:
why do we need to pass down `accessByColumnOrdinal`? it is always false
here right? also it seems we can't do this on nested columns?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -200,19 +200,28 @@ private[parquet] class ParquetRowConverter(
// Converters for each field.
private[this] val fieldConverters: Array[Converter with
HasParentContainerUpdater] = {
- // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is
false
- // to prevent throwing IllegalArgumentException when searching catalyst
type's field index
- val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
- catalystType.fieldNames.zipWithIndex.toMap
+ if (SQLConf.get.parquetAccessByIndex) {
+ // SPARK-36634: When access parquet file by the idx of columns, we can
not ensure 2 types
+ // matched
+ parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
Review comment:
what if `len(parquetType.getFields != len(catalystType)`? I think it's
not always guaranteed because of `ParquetReadSupport.intersectParquetGroups`?
e.g., some fields in catalyst requested schema do not appear in Parquet file
schema?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -139,7 +139,8 @@ private[parquet] class ParquetRowConverter(
convertTz: Option[ZoneId],
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
int96RebaseMode: LegacyBehaviorPolicy.Value,
- updater: ParentContainerUpdater)
+ updater: ParentContainerUpdater,
+ parquetAccessByOrdinal: Boolean)
Review comment:
nit: `parquetAccessByOrdinal` -> `accessByOrdinal`? parquet seems
redundant in this context. Also please update the method comments with the new
parameter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]