GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/19271

    [SPARK-22053][SS] Stream-stream inner join

    ## What changes were proposed in this pull request?
    
    This PR implements stream-stream inner join using a two-way symmetric hash 
join. At a high level, we want to do the following.
    1. For each stream, we maintain the past rows as state in State Store.
      - For each joining key, there can be multiple rows that have been 
received.
      - So, we have to effectively maintain a key-to-list-of-values multimap as 
state for each stream.
    2. In each batch, for each input row in each stream
      - Look up the other streams state to see if there are matching rows, and 
output them if they satisfy the joining condition
      - Add the input row to corresponding stream’s state.
      - If the data has a timestamp/window column with watermark, then we will 
use that to calculate the threshold for keys that are required to buffered for 
future matches and drop the rest from the state.
    
    Cleaning up old unnecessary state rows depends completely on whether 
watermark has been defined and what are join conditions. We definitely want to 
support state clean up two types of queries that are likely to be common.
    
    - Queries to time range conditions - E.g. `SELECT * FROM leftTable, 
rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES 
AND leftTime < rightTime + INTERVAL 1 HOUR`
    - Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, 
rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = 
window(rightTime, "1 hour")` (pseudo-SQL)
    
    Refer to the scaladocs on the `StreamingSymmetricHashJoinExec` class for 
more implementation details.
    
    Besides the implementation of stream-stream inner join SparkPlan. Some 
additional changes are
    - Allowed inner join in append mode in UnsupportedOperationChecker
    - Prevented stream-stream join on an empty batch dataframe to be collapsed 
by the optimizer
    
    ## How was this patch tested?
    - New tests in StreamingJoinSuite
    - Updated tests UnsupportedOperationSuite


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-22053

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19271
    
----
commit 747c8da89d257618bbbfa51b70144911933366e3
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-09-18T22:30:46Z

    Stream stream inner join first commit

commit 9edaa580edc6e599b01033190a9ef383ed001ae9
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-09-18T23:40:34Z

    Updated test

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to