GitHub user tdas opened a pull request:
https://github.com/apache/spark/pull/19271
[SPARK-22053][SS] Stream-stream inner join
## What changes were proposed in this pull request?
This PR implements stream-stream inner join using a two-way symmetric hash
join. At a high level, we want to do the following.
1. For each stream, we maintain the past rows as state in State Store.
- For each joining key, there can be multiple rows that have been
received.
- So, we have to effectively maintain a key-to-list-of-values multimap as
state for each stream.
2. In each batch, for each input row in each stream
- Look up the other streams state to see if there are matching rows, and
output them if they satisfy the joining condition
- Add the input row to corresponding streamâs state.
- If the data has a timestamp/window column with watermark, then we will
use that to calculate the threshold for keys that are required to buffered for
future matches and drop the rest from the state.
Cleaning up old unnecessary state rows depends completely on whether
watermark has been defined and what are join conditions. We definitely want to
support state clean up two types of queries that are likely to be common.
- Queries to time range conditions - E.g. `SELECT * FROM leftTable,
rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES
AND leftTime < rightTime + INTERVAL 1 HOUR`
- Queries with windows as the matching key - E.g. `SELECT * FROM leftTable,
rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") =
window(rightTime, "1 hour")` (pseudo-SQL)
Refer to the scaladocs on the `StreamingSymmetricHashJoinExec` class for
more implementation details.
Besides the implementation of stream-stream inner join SparkPlan. Some
additional changes are
- Allowed inner join in append mode in UnsupportedOperationChecker
- Prevented stream-stream join on an empty batch dataframe to be collapsed
by the optimizer
## How was this patch tested?
- New tests in StreamingJoinSuite
- Updated tests UnsupportedOperationSuite
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark SPARK-22053
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/19271.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #19271
----
commit 747c8da89d257618bbbfa51b70144911933366e3
Author: Tathagata Das <[email protected]>
Date: 2017-09-18T22:30:46Z
Stream stream inner join first commit
commit 9edaa580edc6e599b01033190a9ef383ed001ae9
Author: Tathagata Das <[email protected]>
Date: 2017-09-18T23:40:34Z
Updated test
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]