GitHub user viirya opened a pull request:
https://github.com/apache/spark/pull/5203
[SPARK-6550][SQL] Add PreAnalyzer to keep logical plan consistent across
DataFrame
## Problems
In some cases, the expressions in a logical plan will be modified to new
ones during analysis, e.g. the handling for self-join cases. If some
expressions are resolved based on the analyzed plan, they are referring to
changed expression ids, not original ids.
But the transformation of DataFrame will use logical plan to construct new
DataFrame, e.g. `groupBy` and aggregation. So in such cases, the expressions in
these DataFrames will be inconsistent.
The problems are specified as following:
* Expression ids in logical plan are possibly inconsistent if expression
ids are changed during analysis and some expressions are resolved after that
When we try to run the following codes:
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" ===
$"y.str").groupBy("y.str").min("y.int")
Because `groupBy` and `min` will perform resolving based on the analyzed
logical plan, their expression ids refer to analyzed plan, instead of logical
plan.
So the logical plan of df2 looks like:
'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
'Join Inner, Some(('x.str = 'y.str))
Subquery x
Project [_1#0 AS int#2,_2#1 AS str#3]
LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
Subquery y
Project [_1#0 AS int#2,_2#1 AS str#3]
LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
As you see, the expression ids in `Aggregate` are different to the
expression ids in `Subquery y`. This is the first problem.
* The `df2` can't be performed
The showing logical plan of `df2` can't be performed. Because the
expression ids of `Subquery y` will be modified for self-join handling during
analysis, the analyzed plan of `df2` becomes:
Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
Join Inner, Some((str#3 = str#8))
Subquery x
Project [_1#0 AS int#2,_2#1 AS str#3]
LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
Subquery y
Project [_1#0 AS int#7,_2#1 AS str#8]
LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
The expressions referred in `Aggregate` are not matching to these in
`Subquery y`. This is the second problem.
## Proposed solution
We try to add a `PreAnalyzer`. When a logical plan `rawPlan` is given to
SQLContext, it uses PreAnalyzer to modify the logical plan before assigning to
`QueryExecution.logical`. Then later operations will based on the pre-analyzed
logical plan, instead of the original `rawPlan`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/viirya/spark-1 pre_analyze
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/5203.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5203
----
commit 77ba3a6dba84928935cf18c2e4d9b40a03ac404f
Author: Liang-Chi Hsieh <[email protected]>
Date: 2015-03-26T08:27:52Z
Add PreAnalyzer.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]