GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/19471

    [SPARK-22245][SQL] partitioned data set should always put partition columns 
at the end

    ## Background
    In Spark SQL, partition columns always appear at the end of the schema, 
even with user-specified schema:
    ```
    scala> Seq(1->1).toDF("i", "j").write.partitionBy("i").parquet("/tmp/t")
    
    scala> spark.read.parquet("/tmp/t").show
    +---+---+
    |  j|  i|
    +---+---+
    |  1|  1|
    +---+---+
    
    scala> spark.read.schema("i int, j int").parquet("/tmp/t").show
    +---+---+
    |  j|  i|
    +---+---+
    |  1|  1|
    +---+---+
    
    scala> spark.read.schema("j int, i int").parquet("/tmp/t").show
    +---+---+
    |  j|  i|
    +---+---+
    |  1|  1|
    +---+---+
    ```
    
    This behavior also aligns with tables:
    ```
    scala> sql("create table t(i int, j int) using parquet partitioned by (i)")
    res5: org.apache.spark.sql.DataFrame = []
    
    scala> spark.table("t").printSchema
    root
     |-- j: integer (nullable = true)
     |-- i: integer (nullable = true)
    ```
    
    However, for historical reasons, Spark SQL supports partition columns 
appearing in data files, and respect the order of partition columns in data 
schema but pick the value from partition directories:
    ```
    scala> Seq(1->1, 2 -> 1).toDF("i", "j").write.parquet("/tmp/t/i=1")
    
    // You can see the value of column i is always 1, so the value of partition 
columns are picked
    // from partition directories.
    scala> spark.read.parquet("/tmp/t").show
    17/10/11 16:28:28 WARN DataSource: Found duplicate column(s) in the data 
schema and the partition schema: `i`;
    +---+---+
    |  i|  j|
    +---+---+
    |  1|  1|
    |  1|  1|
    +---+---+
    ```
    
    The behavior of this case is a little weird and have problems when dealing 
with tables(with hive metastore):
    ```
    // With user-specified schema, partition columns are always at the end now.
    scala> spark.read.schema("i int, j int").parquet("/tmp/t").show
    +---+---+
    |  j|  i|
    +---+---+
    |  1|  1|
    |  1|  1|
    +---+---+
    
    scala> spark.read.schema("j int, i int").parquet("/tmp/t").show
    +---+---+
    |  j|  i|
    +---+---+
    |  1|  1|
    |  1|  1|
    +---+---+
    
    // `skipHiveMetadata=true` simulates a hive-incompatible schema.
    scala> sql("create table t using parquet options(skipHiveMetadata=true) 
location '/tmp/t'")
    17/10/11 16:57:00 WARN DataSource: Found duplicate column(s) in the data 
schema and the partition schema: `i`;
    17/10/11 16:57:00 WARN HiveExternalCatalog: Persisting data source table 
`default`.`t` into Hive metastore inSpark SQL specific format, which is NOT 
compatible with Hive.
    java.lang.AssertionError: assertion failed
      at scala.Predef$.assert(Predef.scala:156)
      at 
org.apache.spark.sql.catalyst.catalog.CatalogTable.partitionSchema(interface.scala:242)
      at 
org.apache.spark.sql.hive.HiveExternalCatalog.newSparkSQLSpecificMetastoreTable$1(HiveExternalCatalog.scala:299)
    ...
    ```
    
    The reason of this bug is, when we respect the order of partition columns 
in data schema, we will get an invalid table schema which breaks the assumption 
that partition columns should be at the end.
    
    ## Proposal
    My proposal is to totally ignore the partition columns in data files, so 
that we can have a consistent behavior: partition columns always at the end.
    
    One problem is, we don't have corrected data/physical schema and may fail 
to read non-self-description file format like CSV. I think this is really a 
corner case(having overlapped columns in data and partition schema), and the 
table schema can't have overlapped columns in data and partition schema(unless 
we hack it into table properties), so we'd better make `DataFrameReader` and 
table API consistent and exclude partition columns from data schema.
    
    ## Changed behavior
    No behavior change if there is no overlapped columns in data and partition 
schema.
    
    The schema changed(partition columns go to the end) when reading file 
format data source with partition columns in data files.
    
    Don't support overlapped columns in data and partition schema for 
non-self-description file format(only CSV AFAIK). 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark partition

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19471.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19471
    
----
commit ac7ae6b6149afd630e788a9fb42e6c4b25e84e17
Author: Wenchen Fan <[email protected]>
Date:   2017-10-11T07:36:01Z

    always put partition columns at the end

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to