Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17758#discussion_r124293746
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
    @@ -185,8 +183,21 @@ case class DataSource(
         }
     
         SchemaUtils.checkColumnNameDuplication(
    -      (dataSchema ++ partitionSchema).map(_.name), "in the data schema and 
the partition schema",
    -      sparkSession.sessionState.conf.caseSensitiveAnalysis)
    +      dataSchema.map(_.name), "in the data schema", equality)
    +    SchemaUtils.checkColumnNameDuplication(
    +      partitionSchema.map(_.name), "in the partition schema", equality)
    +
    +    // We just print a waring message if the data schema and partition 
schema have the duplicate
    +    // columns. This is because we allow users to do so in the previous 
Spark releases and
    +    // we have the existing tests for the cases (e.g., 
`ParquetHadoopFsRelationSuite`).
    +    // See SPARK-18108 and SPARK-21144 for related discussions.
    +    try {
    +      SchemaUtils.checkColumnNameDuplication(
    --- End diff --
    
    I rechecked related code path again though, I couldn't find this issue ( 
`dataSchema` and `partitionSchema` has duplicate column names) in other data 
sources. Actually, I think this issue happens in file-based data sources only 
when users directly write partition directories (e.g., `Seq(1, 2, 
3).toDF("a").write.parquet(s"$path/a=1")`). 
    
    In catalog tables, `dataSchema` and `partitionSchema` couldn't has 
duplicate names in write paths;
    ```
    
    scala> sql("""CREATE TABLE t1(a INT, b INT, c INT) PARTITIONED BY (a 
INT)""")
    org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the 
table definition of `default`.`t1`: `a`;
      at 
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85)
      at 
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:42)
      at 
org.apache.spark.sql.execution.datasources.PreprocessTableCreation.org$apache$spark$sql$execution$datasources$PreprocessTableCreation$$normalizeCatalogTable(rules.scala:226)
    
    scala> sql("""CREATE TABLE t2(a INT, b INT, c INT)""")
    scala> sql("""ALTER TABLE t2 ADD PARTITION (a = 1)""")
    org.apache.spark.sql.AnalysisException: a is not a valid partition column 
in table `default`.`t2`.;
      at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$7$$anonfun$9.apply(PartitioningUtils.scala:300)
      at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$7$$anonfun$9.apply(PartitioningUtils.scala:300)
      at scala.Option.getOrElse(Option.scala:121)
    ```
    
    In stream sources, schemas have no partition (so, this issue does not 
happen) in read paths;
    
https://github.com/maropu/spark/blob/ad30aded7e95bb51d2028a4a21998c72c0338b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L210
    
https://github.com/maropu/spark/blob/ad30aded7e95bb51d2028a4a21998c72c0338b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L255
    
    In stream sinks, since we assume partition columns is selected from data 
columns 
(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L116),
 this issue does not happen.
    
    So, IMHO the duplication check in `getOrInferFileFormatSchema ` is enough 
for this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to