Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/12952#issuecomment-220104678
  
    @marmbrus Sorry for the late reply. This issue is a little bit complicated. 
I made the following (long) summary for explanation and future reference.
    
    ----
    
    Encoders lookup fields by name. Namely, within a Dataset, schema of its 
encoder and schema of its underlying logical plan may have different field 
orders. This is true for either top-level fields or inner nested struct fields. 
Encoders are able to adjust field order at runtime according its own schema by 
binding expressions by name at analysis time.
    
    We observe that encoders have the following characteristics:
    
    1.  Encoders are somewhat similar to projections as they are able to adjust 
field orders, but projections only handle top-level fields.
    1.  You can't alias encoder field names as they are either hard coded 
(primitive encoders always have field name "value"), or decided by case 
class/Java bean field names.
    
    ## Datasets and encoders in Spark 1.6
    
    In Spark 1.6, for a Dataset `ds`, we have the following facts:
    
    1.  `ds.schema == ds.resolvedTEncoder.schema`
    1.  `ds.toDF().schema == ds.logicalPlan.schema`
    
    Together with the two aforementioned characteristics, we have the following 
inconsistent results under Spark 1.6:
    
    1.  Datasets with a single primitive field
    
        ```scala
        val df1 = sqlContext.range(3)
    
        df1.schema
        // StructType(StructField(id,LongType,false))
    
        val ds1 = df1.as[Long]
    
        ds1.toDF().schema
    
        ds1.schema
        // StructType(StructField(value,LongType,false))
        //                        ~~~~~
        //                             Conforms to encoder schema
    
        ds1.printSchema()
        // root
        // |-- id: long (nullable = false
        //     ~~
        //       Conforms to logical plan schema as `printSchema()`
        //       delegates `toDF().printSchema()`
    
        ds1.show()
    
        // +-----+
        // |value|  <-- Conforms to encoder schema
        // +-----+
        // |    0|
        // |    1|
        // |    2|
        // +-----+
    
        ds1.toDF().schema
        // StructType(StructField(id,LongType,false))
        //                        ~~
        //                          Conforms to logical plan schema
        ```
    
    1.  Datasets of case classes
    
        ```scala
        val df2 = Seq("foo" -> 1, "bar" -> 2).toDF("b", "a")
    
        df2.schema
        // StructType(
        //   StructField(b,StringType,true),
        //   StructField(a,IntegerType,false)
        // )
    
        case class T(a: Int, b: String)
    
        val ds2 = df2.as[T]
    
        ds2.schema
        // StructType(                          (1) Fields reordered by encoder
        //   StructField(a,IntegerType,false),
        //   StructField(b,StringType,true)
        // )
    
        ds2.printSchema()
        // root                                 (2) Field order inconsistent 
with (1)
        //  |-- b: string (nullable = true)
        //  |-- a: integer (nullable = false)
    
        ds2.show()
        // +---+---+
        // |  a|  b|
        // +---+---+
        // |  1|foo|
        // |  2|bar|
        // +---+---+
    
        ds2.toDF().show()
        // +---+---+
        // |  b|  a|
        // +---+---+
        // |foo|  1|
        // |bar|  2|
        // +---+---+
        ```
    
    Conclusions:
    
    1.  It's a bug that `ds.schema` can be inconsistent with `ds.printSchema()`.
    1.  When `ds.schema` is inconsistent with `ds.logicalPlan.schema`, we may 
end up with inconsistent column order and/or inconsistent column names. However:
    
        -   At least data are always correct.
        -   These inconsistencies don't introduce too much troubles as Dataset 
in Spark 1.6 has quite limited features.
    
    ## Issues introduced in Spark 2.0
    
    In Spark 2.0, two issues are closely related to SPARK-15112:
    
    1.  While unifying DataFrame and Dataset API, `ds.schema` was accidentally 
changed to `ds.logicalPlan.schema` instead of `ds.resolvedTEncoder.schema`
    
        This change fixes inconsistency between `ds.schema` and 
`ds.printSchema()` as now both of them uses logical plan schema.
    
        However, it led to an even worse inconsistency:
    
        ```scala
        val df3 = Seq("foo" -> 1, "bar" -> 2).toDF("b", "a")
    
        df3.show()
        // +---+---+
        // |  b|  a|
        // +---+---+
        // |foo|  1|
        // |bar|  2|
        // +---+---+
    
        case class T(a: Int, b: String)
        val ds3 = df3.as[T]
    
        ds3.show()
        // +---+---+
        // |  b|  a|    <-- Order of field names are is inconsistent
        // +---+---+        with field data now.
        // |  1|foo|
        // |  2|bar|
        // +---+---+
        ```
    
        This is because `ds3.show()` uses `ds3.schema`, which equals to 
`ds3.logicalPlan.schema`, to show field names, but field order in data rows are 
adjusted by encoders.
    
    1.  `EmbedSerializerInFilter` implicitly assumes that de/serializer pairs 
don't change field order
    
        The `EmbedSerializerInFilter` optimization rule may optimize plan 
fragments like
    
        ```
        SerializeFromObject
         Filter
          DeserializeToObject   (1)
           <child-plan>         (2)
        ```
    
        into
    
        ```
        Filter
         <child-plan>
        ```
    
        by embedding the deserializer expression into the `Filter` condition 
expression. Namely, when filtering an input row, the new `Filter` operator 
always deserializes the input row into a Scala object, and then uses the object 
as argument to invoke the user-provided Scala predicate function.
    
        The problem here is that, in the original plan fragment, output field 
order at (1) may differ from field order at (2) because `DeserializeToObject` 
may adjust field order using encoder deserialization expressions.
    
        However, in the simplified fragment, `Filter.output` simply inherits 
child plan output. But deserialization expressions resolved by encoder still 
adjust field order as before. Thus `Filter` may output corrupted data because 
of wrong input column order.
    
    ## Fixes tried in this PR and why they don't work
    
    ### Dataset schema issue
    
    For the first issue, this PR tried to restore behavior of Spark 1.6, namely 
making `ds.schema == ds.resolvedTEncoder.schema` again.
    
    This doesn't work because we migrated a lot more operations to Dataset API 
from DataFrame API, and these operations don't play well with the old Dataset 
schema constraint. Here are some examples:
    
    #### Hard coded primitive encoder field name
    
    Primitive encoders (e.g. `Encoder[Long]` always has only a single field 
named "value".
    
    ```scala
    spark.range(10).schema
    // StructType(StructField(value,LongType,false))
    //                        ~~~~~
    //                             Conforms to encoder schema
    
    val df4 = Seq(Tuple1("foo")).toDF("str")
    
    df4.schema
    // StructType(StructField(str,StringType,true))
    
    df4.as[String].schema
    // StructType(StructField(value,StringType,true))
    //                        ~~~~~
    //                             Conforms to encoder schema
    ```
    
    #### Nullability mismatch
    
    ```scala
    val df5 = Seq.empty[Tuple1[Option[Int]]].toDF("i")
    val df6 = Seq.empty[Tuple1[Int]].toDF("i")
    val df7 = df5.intersect(df6)
    
    df7.schema
    // StructType(StructField(i,IntegerType,true))
    //                                      ~~~~
    ```
    
    Field `i` in `df7` is expected to be false according to `Intersect.output`, 
but is now true because `df7.schema` is decided by 
`df5.resolvedTEncoder.schema` rather than `df5.logicalPlan.schema`. Thus 
nullability change made in `Intersect` is overwritten.
    
    ### `EmbedSerializerInFilter` issue
    
    This PR tried to fix this issue by adding a projection in 
`EmbedSerializerInFilter` when necessary to adjust field order. However, I soon 
realized that `Project` doesn't cover inner nested fields.
    
    Another alternative is never eliminate the de/serializer pair when the pair 
reorders fields. But this may miss lots of useful optimization opportunities.
    
    
    ## Proposed fix
    
    After considering all the aforementioned cases, I think we need to meet the 
following constraints to fix these issues:
    
    1.  `ds.schema` should be defined as `ds.logicalPlan.schema`
    1.  `ds.resolvedTEncoder.schema` and `ds.logicalPlan.schema` may differ in 
field names and/or field nullability, but not in data types.
    
    In this way, field names and nullability won't be overwritten by encoder 
schema, and we don't to worry about issues like the one in 
`EmbedSerializerInFilter`.
    
    To do this, we can adjust field order (including inner nested fields) of 
the logical plan when constructing a Dataset to make sure the order is 
consistent with encoder.
    
    One tricky stuff in our current encoder design is that, it couples field 
reordering and Java object de/serialization. I think it's more reasonable to 
move field reordering into `Project`. Namely, make `Project` to be able to 
handle inner nested fields.
    
    For example, say field `a` is of type string, and field `b` is of type 
`STRUCT<b1: INT, b2: STRING>`, when feeding a row like
    
    ```
    [b: [b2: 'foo', b1: 1], a: 'bar']
    ```
    
    to
    
    ```
    Project a, b
    ```
    
    The projection should output
    
    ```
    [a: 'bar', b: [b1: 1, b2: 'foo']]
    ```
    
    Note that order of inner fields `b1` and `b2` is also adjusted accordingly.
    
    There can be several benefits if we can decouple field reordering and Java 
object de/serialization:
    
    1.  Simplifies encoder implementation as it only needs to handle 
de/serialization now.
    1.  It would be trivial to do the aforementioned logical plan field order 
adjust when constructing a Dataset
    1.  Can be potentially useful for pruning nested fields
    
    [1]: 
https://issues.apache.org/jira/browse/SPARK-15112?focusedCommentId=15273837&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15273837



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to