GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/5203

    [SPARK-6550][SQL] Add PreAnalyzer to keep logical plan consistent across 
DataFrame

    ## Problems
    
    In some cases, the expressions in a logical plan will be modified to new 
ones during analysis, e.g. the handling for self-join cases. If some 
expressions are resolved based on the analyzed plan, they are referring to 
changed expression ids, not original ids.
    
    But the transformation of DataFrame will use logical plan to construct new 
DataFrame, e.g. `groupBy` and aggregation. So in such cases, the expressions in 
these DataFrames will be inconsistent.
    
    The problems are specified as following:
    
    * Expression ids in logical plan are possibly inconsistent if expression 
ids are changed during analysis and some expressions are resolved after that
    
    When we try to run the following codes:
    
        val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
        val df2 = df.as('x).join(df.as('y), $"x.str" === 
$"y.str").groupBy("y.str").min("y.int")
    
    
    Because `groupBy` and `min` will perform resolving based on the analyzed 
logical plan, their expression ids refer to analyzed plan, instead of logical 
plan.
    
    So the logical plan of df2 looks like:
    
        'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
         'Join Inner, Some(('x.str = 'y.str))
          Subquery x
           Project [_1#0 AS int#2,_2#1 AS str#3]
            LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
          Subquery y
           Project [_1#0 AS int#2,_2#1 AS str#3]
            LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
    
    
    As you see, the expression ids in `Aggregate` are different to the 
expression ids in `Subquery y`. This is the first problem.
    
    * The `df2` can't be performed
    
    The showing logical plan of `df2` can't be performed. Because the 
expression ids of `Subquery y` will be modified for self-join handling during 
analysis, the analyzed plan of `df2` becomes:
    
    
        Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
         Join Inner, Some((str#3 = str#8))
          Subquery x
           Project [_1#0 AS int#2,_2#1 AS str#3]
            LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
          Subquery y
           Project [_1#0 AS int#7,_2#1 AS str#8]
            LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
    
    
    The expressions referred in `Aggregate` are not matching to these in 
`Subquery y`. This is the second problem.
    
    ## Proposed solution
    
    We try to add a `PreAnalyzer`. When a logical plan `rawPlan` is given to 
SQLContext, it uses PreAnalyzer to modify the logical plan before assigning to 
`QueryExecution.logical`. Then later operations will based on the pre-analyzed 
logical plan, instead of the original `rawPlan`.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 pre_analyze

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5203
    
----
commit 77ba3a6dba84928935cf18c2e4d9b40a03ac404f
Author: Liang-Chi Hsieh <[email protected]>
Date:   2015-03-26T08:27:52Z

    Add PreAnalyzer.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to