GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/20765

    [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3

    This is a backport of #20598.
    
    ## What changes were proposed in this pull request?
    
    Solved two bugs to enable stream-stream self joins.
    
    ### Incorrect analysis due to missing MultiInstanceRelation trait
    Streaming leaf nodes did not extend MultiInstanceRelation, which is 
necessary for the catalyst analyzer to convert the self-join logical plan DAG 
into a tree (by creating new instances of the leaf relations). This was causing 
the error `Failure when resolving conflicting references in Join:` (see JIRA 
for details).
    
    ### Incorrect attribute rewrite when splicing batch plans in 
MicroBatchExecution
    When splicing the source's batch plan into the streaming plan (by replacing 
the StreamingExecutionPlan), we were rewriting the attribute reference in the 
streaming plan with the new attribute references from the batch plan. This was 
incorrectly handling the scenario when multiple StreamingExecutionRelation 
point to the same source, and therefore eventually point to the same batch plan 
returned by the source. Here is an example query, and its corresponding plan 
transformations.
    ```
    val df = input.toDF
    val join =
          df.select('value % 5 as "key", 'value).join(
            df.select('value % 5 as "key", 'value), "key")
    ```
    Streaming logical plan before splicing the batch plan
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- StreamingExecutionRelation Memory[#1], value#1
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- StreamingExecutionRelation Memory[#1], value#12  // two different 
leaves pointing to same source
    ```
    Batch logical plan after splicing the batch plan and before rewriting
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- LocalRelation [value#66]           // replaces 
StreamingExecutionRelation Memory[#1], value#1
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- LocalRelation [value#66]           // replaces 
StreamingExecutionRelation Memory[#1], value#12
    ```
    Batch logical plan after rewriting the attributes. Specifically, for 
spliced, the new output attributes (value#66) replace the earlier output 
attributes (value#12, and value#1, one for each StreamingExecutionRelation).
    ```
    Project [key#6, value#66, value#66]       // both value#1 and value#12 
replaces by value#66
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#66 % 5) AS key#6, value#66]
       :  +- LocalRelation [value#66]
       +- Project [(value#66 % 5) AS key#9, value#66]
          +- LocalRelation [value#66]
    ```
    This causes the optimizer to eliminate value#66 from one side of the join.
    ```
    Project [key#6, value#66, value#66]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#66 % 5) AS key#6, value#66]
       :  +- LocalRelation [value#66]
       +- Project [(value#66 % 5) AS key#9]   // this does not generate value, 
incorrect join results
          +- LocalRelation [value#66]
    ```
    
    **Solution**: Instead of rewriting attributes, use a Project to introduce 
aliases between the output attribute references and the new reference generated 
by the spliced plans. The analyzer and optimizer will take care of the rest.
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- Project [value#66 AS value#1]   // solution: project with aliases
       :     +- LocalRelation [value#66]
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- Project [value#66 AS value#12]    // solution: project with aliases
             +- LocalRelation [value#66]
    ```
    
    ## How was this patch tested?
    New unit test


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-23406-2.3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20765.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20765
    
----
commit c3ec9ef9355a3290d764dda0191165eaa4e49062
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-02-14T22:27:02Z

    [SPARK-23406][SS] Enable stream-stream self-joins
    
    ## What changes were proposed in this pull request?
    
    Solved two bugs to enable stream-stream self joins.
    
    ### Incorrect analysis due to missing MultiInstanceRelation trait
    Streaming leaf nodes did not extend MultiInstanceRelation, which is 
necessary for the catalyst analyzer to convert the self-join logical plan DAG 
into a tree (by creating new instances of the leaf relations). This was causing 
the error `Failure when resolving conflicting references in Join:` (see JIRA 
for details).
    
    ### Incorrect attribute rewrite when splicing batch plans in 
MicroBatchExecution
    When splicing the source's batch plan into the streaming plan (by replacing 
the StreamingExecutionPlan), we were rewriting the attribute reference in the 
streaming plan with the new attribute references from the batch plan. This was 
incorrectly handling the scenario when multiple StreamingExecutionRelation 
point to the same source, and therefore eventually point to the same batch plan 
returned by the source. Here is an example query, and its corresponding plan 
transformations.
    ```
    val df = input.toDF
    val join =
          df.select('value % 5 as "key", 'value).join(
            df.select('value % 5 as "key", 'value), "key")
    ```
    Streaming logical plan before splicing the batch plan
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- StreamingExecutionRelation Memory[#1], value#1
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- StreamingExecutionRelation Memory[#1], value#12  // two different 
leaves pointing to same source
    ```
    Batch logical plan after splicing the batch plan and before rewriting
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- LocalRelation [value#66]           // replaces 
StreamingExecutionRelation Memory[#1], value#1
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- LocalRelation [value#66]           // replaces 
StreamingExecutionRelation Memory[#1], value#12
    ```
    Batch logical plan after rewriting the attributes. Specifically, for 
spliced, the new output attributes (value#66) replace the earlier output 
attributes (value#12, and value#1, one for each StreamingExecutionRelation).
    ```
    Project [key#6, value#66, value#66]       // both value#1 and value#12 
replaces by value#66
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#66 % 5) AS key#6, value#66]
       :  +- LocalRelation [value#66]
       +- Project [(value#66 % 5) AS key#9, value#66]
          +- LocalRelation [value#66]
    ```
    This causes the optimizer to eliminate value#66 from one side of the join.
    ```
    Project [key#6, value#66, value#66]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#66 % 5) AS key#6, value#66]
       :  +- LocalRelation [value#66]
       +- Project [(value#66 % 5) AS key#9]   // this does not generate value, 
incorrect join results
          +- LocalRelation [value#66]
    ```
    
    **Solution**: Instead of rewriting attributes, use a Project to introduce 
aliases between the output attribute references and the new reference generated 
by the spliced plans. The analyzer and optimizer will take care of the rest.
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- Project [value#66 AS value#1]   // solution: project with aliases
       :     +- LocalRelation [value#66]
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- Project [value#66 AS value#12]    // solution: project with aliases
             +- LocalRelation [value#66]
    ```
    
    ## How was this patch tested?
    New unit test
    
    Author: Tathagata Das <tathagata.das1...@gmail.com>
    
    Closes #20598 from tdas/SPARK-23406.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to