Github user liancheng commented on the pull request:
https://github.com/apache/spark/pull/12952#issuecomment-220104678
@marmbrus Sorry for the late reply. This issue is a little bit complicated.
I made the following (long) summary for explanation and future reference.
----
Encoders lookup fields by name. Namely, within a Dataset, schema of its
encoder and schema of its underlying logical plan may have different field
orders. This is true for either top-level fields or inner nested struct fields.
Encoders are able to adjust field order at runtime according its own schema by
binding expressions by name at analysis time.
We observe that encoders have the following characteristics:
1. Encoders are somewhat similar to projections as they are able to adjust
field orders, but projections only handle top-level fields.
1. You can't alias encoder field names as they are either hard coded
(primitive encoders always have field name "value"), or decided by case
class/Java bean field names.
## Datasets and encoders in Spark 1.6
In Spark 1.6, for a Dataset `ds`, we have the following facts:
1. `ds.schema == ds.resolvedTEncoder.schema`
1. `ds.toDF().schema == ds.logicalPlan.schema`
Together with the two aforementioned characteristics, we have the following
inconsistent results under Spark 1.6:
1. Datasets with a single primitive field
```scala
val df1 = sqlContext.range(3)
df1.schema
// StructType(StructField(id,LongType,false))
val ds1 = df1.as[Long]
ds1.toDF().schema
ds1.schema
// StructType(StructField(value,LongType,false))
// ~~~~~
// Conforms to encoder schema
ds1.printSchema()
// root
// |-- id: long (nullable = false
// ~~
// Conforms to logical plan schema as `printSchema()`
// delegates `toDF().printSchema()`
ds1.show()
// +-----+
// |value| <-- Conforms to encoder schema
// +-----+
// | 0|
// | 1|
// | 2|
// +-----+
ds1.toDF().schema
// StructType(StructField(id,LongType,false))
// ~~
// Conforms to logical plan schema
```
1. Datasets of case classes
```scala
val df2 = Seq("foo" -> 1, "bar" -> 2).toDF("b", "a")
df2.schema
// StructType(
// StructField(b,StringType,true),
// StructField(a,IntegerType,false)
// )
case class T(a: Int, b: String)
val ds2 = df2.as[T]
ds2.schema
// StructType( (1) Fields reordered by encoder
// StructField(a,IntegerType,false),
// StructField(b,StringType,true)
// )
ds2.printSchema()
// root (2) Field order inconsistent
with (1)
// |-- b: string (nullable = true)
// |-- a: integer (nullable = false)
ds2.show()
// +---+---+
// | a| b|
// +---+---+
// | 1|foo|
// | 2|bar|
// +---+---+
ds2.toDF().show()
// +---+---+
// | b| a|
// +---+---+
// |foo| 1|
// |bar| 2|
// +---+---+
```
Conclusions:
1. It's a bug that `ds.schema` can be inconsistent with `ds.printSchema()`.
1. When `ds.schema` is inconsistent with `ds.logicalPlan.schema`, we may
end up with inconsistent column order and/or inconsistent column names. However:
- At least data are always correct.
- These inconsistencies don't introduce too much troubles as Dataset
in Spark 1.6 has quite limited features.
## Issues introduced in Spark 2.0
In Spark 2.0, two issues are closely related to SPARK-15112:
1. While unifying DataFrame and Dataset API, `ds.schema` was accidentally
changed to `ds.logicalPlan.schema` instead of `ds.resolvedTEncoder.schema`
This change fixes inconsistency between `ds.schema` and
`ds.printSchema()` as now both of them uses logical plan schema.
However, it led to an even worse inconsistency:
```scala
val df3 = Seq("foo" -> 1, "bar" -> 2).toDF("b", "a")
df3.show()
// +---+---+
// | b| a|
// +---+---+
// |foo| 1|
// |bar| 2|
// +---+---+
case class T(a: Int, b: String)
val ds3 = df3.as[T]
ds3.show()
// +---+---+
// | b| a| <-- Order of field names are is inconsistent
// +---+---+ with field data now.
// | 1|foo|
// | 2|bar|
// +---+---+
```
This is because `ds3.show()` uses `ds3.schema`, which equals to
`ds3.logicalPlan.schema`, to show field names, but field order in data rows are
adjusted by encoders.
1. `EmbedSerializerInFilter` implicitly assumes that de/serializer pairs
don't change field order
The `EmbedSerializerInFilter` optimization rule may optimize plan
fragments like
```
SerializeFromObject
Filter
DeserializeToObject (1)
<child-plan> (2)
```
into
```
Filter
<child-plan>
```
by embedding the deserializer expression into the `Filter` condition
expression. Namely, when filtering an input row, the new `Filter` operator
always deserializes the input row into a Scala object, and then uses the object
as argument to invoke the user-provided Scala predicate function.
The problem here is that, in the original plan fragment, output field
order at (1) may differ from field order at (2) because `DeserializeToObject`
may adjust field order using encoder deserialization expressions.
However, in the simplified fragment, `Filter.output` simply inherits
child plan output. But deserialization expressions resolved by encoder still
adjust field order as before. Thus `Filter` may output corrupted data because
of wrong input column order.
## Fixes tried in this PR and why they don't work
### Dataset schema issue
For the first issue, this PR tried to restore behavior of Spark 1.6, namely
making `ds.schema == ds.resolvedTEncoder.schema` again.
This doesn't work because we migrated a lot more operations to Dataset API
from DataFrame API, and these operations don't play well with the old Dataset
schema constraint. Here are some examples:
#### Hard coded primitive encoder field name
Primitive encoders (e.g. `Encoder[Long]` always has only a single field
named "value".
```scala
spark.range(10).schema
// StructType(StructField(value,LongType,false))
// ~~~~~
// Conforms to encoder schema
val df4 = Seq(Tuple1("foo")).toDF("str")
df4.schema
// StructType(StructField(str,StringType,true))
df4.as[String].schema
// StructType(StructField(value,StringType,true))
// ~~~~~
// Conforms to encoder schema
```
#### Nullability mismatch
```scala
val df5 = Seq.empty[Tuple1[Option[Int]]].toDF("i")
val df6 = Seq.empty[Tuple1[Int]].toDF("i")
val df7 = df5.intersect(df6)
df7.schema
// StructType(StructField(i,IntegerType,true))
// ~~~~
```
Field `i` in `df7` is expected to be false according to `Intersect.output`,
but is now true because `df7.schema` is decided by
`df5.resolvedTEncoder.schema` rather than `df5.logicalPlan.schema`. Thus
nullability change made in `Intersect` is overwritten.
### `EmbedSerializerInFilter` issue
This PR tried to fix this issue by adding a projection in
`EmbedSerializerInFilter` when necessary to adjust field order. However, I soon
realized that `Project` doesn't cover inner nested fields.
Another alternative is never eliminate the de/serializer pair when the pair
reorders fields. But this may miss lots of useful optimization opportunities.
## Proposed fix
After considering all the aforementioned cases, I think we need to meet the
following constraints to fix these issues:
1. `ds.schema` should be defined as `ds.logicalPlan.schema`
1. `ds.resolvedTEncoder.schema` and `ds.logicalPlan.schema` may differ in
field names and/or field nullability, but not in data types.
In this way, field names and nullability won't be overwritten by encoder
schema, and we don't to worry about issues like the one in
`EmbedSerializerInFilter`.
To do this, we can adjust field order (including inner nested fields) of
the logical plan when constructing a Dataset to make sure the order is
consistent with encoder.
One tricky stuff in our current encoder design is that, it couples field
reordering and Java object de/serialization. I think it's more reasonable to
move field reordering into `Project`. Namely, make `Project` to be able to
handle inner nested fields.
For example, say field `a` is of type string, and field `b` is of type
`STRUCT<b1: INT, b2: STRING>`, when feeding a row like
```
[b: [b2: 'foo', b1: 1], a: 'bar']
```
to
```
Project a, b
```
The projection should output
```
[a: 'bar', b: [b1: 1, b2: 'foo']]
```
Note that order of inner fields `b1` and `b2` is also adjusted accordingly.
There can be several benefits if we can decouple field reordering and Java
object de/serialization:
1. Simplifies encoder implementation as it only needs to handle
de/serialization now.
1. It would be trivial to do the aforementioned logical plan field order
adjust when constructing a Dataset
1. Can be potentially useful for pruning nested fields
[1]:
https://issues.apache.org/jira/browse/SPARK-15112?focusedCommentId=15273837&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15273837
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]