GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/19271
[SPARK-22053][SS] Stream-stream inner join ## What changes were proposed in this pull request? This PR implements stream-stream inner join using a two-way symmetric hash join. At a high level, we want to do the following. 1. For each stream, we maintain the past rows as state in State Store. - For each joining key, there can be multiple rows that have been received. - So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream. 2. In each batch, for each input row in each stream - Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition - Add the input row to corresponding streamâs state. - If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state. Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common. - Queries to time range conditions - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR` - Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")` (pseudo-SQL) Refer to the scaladocs on the `StreamingSymmetricHashJoinExec` class for more implementation details. Besides the implementation of stream-stream inner join SparkPlan. Some additional changes are - Allowed inner join in append mode in UnsupportedOperationChecker - Prevented stream-stream join on an empty batch dataframe to be collapsed by the optimizer ## How was this patch tested? - New tests in StreamingJoinSuite - Updated tests UnsupportedOperationSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-22053 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19271.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19271 ---- commit 747c8da89d257618bbbfa51b70144911933366e3 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-09-18T22:30:46Z Stream stream inner join first commit commit 9edaa580edc6e599b01033190a9ef383ed001ae9 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-09-18T23:40:34Z Updated test ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org