Github user mallman commented on a diff in the pull request:
https://github.com/apache/spark/pull/21320#discussion_r199648692
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
---
@@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter(
// Converters for each field.
private val fieldConverters: Array[Converter with
HasParentContainerUpdater] = {
- parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
- case ((parquetFieldType, catalystField), ordinal) =>
- // Converted field value should be set to the `ordinal`-th cell of
`currentRow`
- newConverter(parquetFieldType, catalystField.dataType, new
RowUpdater(currentRow, ordinal))
+ parquetType.getFields.asScala.map {
+ case parquetField =>
+ val fieldIndex = catalystType.fieldIndex(parquetField.getName)
--- End diff --
I dropped into the `sql/console` and attempted to write a parquet file with
duplicate column names. It didn't work. Transcript below.
```
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> val sameColumnNames = StructType(StructField("a", IntegerType) ::
StructField("a", StringType) :: Nil)
sameColumnNames: org.apache.spark.sql.types.StructType =
StructType(StructField(a,IntegerType,true), StructField(a,StringType,true))
scala> val rowRDD = sqlContext.sparkContext.parallelize(Row(1, "one") ::
Row(2, "two") :: Nil, 1)
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] =
ParallelCollectionRDD[0] at parallelize at <console>:51
scala> val df = sqlContext.createDataFrame(rowRDD, sameColumnNames)
18/07/02 16:31:33 INFO SharedState: Setting hive.metastore.warehouse.dir
('null') to the value of spark.sql.warehouse.dir
('file:/Volumes/VideoAmpCS/msa/workspace/spark-public/spark-warehouse').
18/07/02 16:31:33 INFO SharedState: Warehouse path is
'file:/Volumes/VideoAmpCS/msa/workspace/spark-public/spark-warehouse'.
18/07/02 16:31:33 INFO ContextHandler: Started
o.e.j.s.ServletContextHandler@7b13b737{/SQL,null,AVAILABLE,@Spark}
18/07/02 16:31:33 INFO ContextHandler: Started
o.e.j.s.ServletContextHandler@3c9fb104{/SQL/json,null,AVAILABLE,@Spark}
18/07/02 16:31:33 INFO ContextHandler: Started
o.e.j.s.ServletContextHandler@3d5cadbe{/SQL/execution,null,AVAILABLE,@Spark}
18/07/02 16:31:33 INFO ContextHandler: Started
o.e.j.s.ServletContextHandler@73732e26{/SQL/execution/json,null,AVAILABLE,@Spark}
18/07/02 16:31:33 INFO ContextHandler: Started
o.e.j.s.ServletContextHandler@72a13c4a{/static/sql,null,AVAILABLE,@Spark}
18/07/02 16:31:34 INFO StateStoreCoordinatorRef: Registered
StateStoreCoordinator endpoint
df: org.apache.spark.sql.DataFrame = [a: int, a: string]
scala> df.write.parquet("sameColumnNames.parquet")
org.apache.spark.sql.AnalysisException: Found duplicate column(s) when
inserting into
file:/Volumes/VideoAmpCS/msa/workspace/spark-public/sameColumnNames.parquet:
`a`;
at
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85)
at
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:42)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:64)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:662)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:554)
... 42 elided
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]