Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22148#discussion_r211149160
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
---
@@ -277,14 +291,35 @@ private[parquet] object ParquetReadSupport {
* @return A list of clipped [[GroupType]] fields, which can be empty.
*/
private def clipParquetGroupFields(
- parquetRecord: GroupType, structType: StructType): Seq[Type] = {
- val parquetFieldMap = parquetRecord.getFields.asScala.map(f =>
f.getName -> f).toMap
+ parquetRecord: GroupType, structType: StructType, caseSensitive:
Boolean): Seq[Type] = {
val toParquet = new
SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
- structType.map { f =>
- parquetFieldMap
- .get(f.name)
- .map(clipParquetType(_, f.dataType))
- .getOrElse(toParquet.convertField(f))
+ if (caseSensitive) {
+ val caseSensitiveParquetFieldMap =
+ parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
+ structType.map { f =>
+ caseSensitiveParquetFieldMap
+ .get(f.name)
+ .map(clipParquetType(_, f.dataType, caseSensitive))
+ .getOrElse(toParquet.convertField(f))
+ }
+ } else {
+ // Do case-insensitive resolution only if in case-insensitive mode
+ val caseInsensitiveParquetFieldMap =
+ parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase)
+ structType.map { f =>
+ caseInsensitiveParquetFieldMap
+ .get(f.name.toLowerCase)
+ .map { parquetTypes =>
+ if (parquetTypes.size > 1) {
+ // Need to fail if there is ambiguity, i.e. more than one
field is matched
+ val parquetTypesString =
parquetTypes.map(_.getName).mkString("[", ", ", "]")
+ throw new AnalysisException(s"""Found duplicate field(s)
"${f.name}": """ +
--- End diff --
This is trigger at runtime at executor side, we should probably use
`RuntimeException` here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]