ueshin opened a new pull request #34053:
URL: https://github.com/apache/spark/pull/34053
### What changes were proposed in this pull request?
Proposes an infrastructure for as-of join and implements `ps.merge_asof`
here.
1. Introduce `AsOfJoin` logical plan
2. Rewrite the plan in the optimize phase:
- From something like (SQL syntax is not determied):
```sql
SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t),
tolerance)
```
- To
```sql
SELECT left.*, __right__.*
FROM (
SELECT
left.*,
(
SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
FROM right
WHERE condition AND left.t >= right.t AND right.t >= left.t -
tolerance
) as __right__
FROM left
)
```
3. The rewritten scalar-subquery will be handled by the existing
decorrelation framework.
Note: APIs on SQL DataFrames and SQL syntax are TBD
([SPARK-22947](https://issues.apache.org/jira/browse/SPARK-22947)), although
there are temporary APIs added here.
### Why are the changes needed?
Pandas' `merge_asof` or as-of join for SQL/DataFrame is useful for time
series analysis.
### Does this PR introduce _any_ user-facing change?
Yes. `ps.merge_asof` can be used.
```py
>>> quotes
time ticker bid ask
0 2016-05-25 13:30:00.023 GOOG 720.50 720.93
1 2016-05-25 13:30:00.023 MSFT 51.95 51.96
2 2016-05-25 13:30:00.030 MSFT 51.97 51.98
3 2016-05-25 13:30:00.041 MSFT 51.99 52.00
4 2016-05-25 13:30:00.048 GOOG 720.50 720.93
5 2016-05-25 13:30:00.049 AAPL 97.99 98.01
6 2016-05-25 13:30:00.072 GOOG 720.50 720.88
7 2016-05-25 13:30:00.075 MSFT 52.01 52.03
>>> trades
time ticker price quantity
0 2016-05-25 13:30:00.023 MSFT 51.95 75
1 2016-05-25 13:30:00.038 MSFT 51.95 155
2 2016-05-25 13:30:00.048 GOOG 720.77 100
3 2016-05-25 13:30:00.048 GOOG 720.92 100
4 2016-05-25 13:30:00.048 AAPL 98.00 100
>>> ps.merge_asof(
... trades, quotes, on="time", by="ticker"
... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
time ticker price quantity bid ask
0 2016-05-25 13:30:00.023 MSFT 51.95 75 51.95 51.96
1 2016-05-25 13:30:00.038 MSFT 51.95 155 51.97 51.98
2 2016-05-25 13:30:00.048 AAPL 98.00 100 NaN NaN
3 2016-05-25 13:30:00.048 GOOG 720.77 100 720.50 720.93
4 2016-05-25 13:30:00.048 GOOG 720.92 100 720.50 720.93
>>> ps.merge_asof(
... trades,
... quotes,
... on="time",
... by="ticker",
... tolerance=F.expr("INTERVAL 2 MILLISECONDS") # pd.Timedelta("2ms")
... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
time ticker price quantity bid ask
0 2016-05-25 13:30:00.023 MSFT 51.95 75 51.95 51.96
1 2016-05-25 13:30:00.038 MSFT 51.95 155 NaN NaN
2 2016-05-25 13:30:00.048 AAPL 98.00 100 NaN NaN
3 2016-05-25 13:30:00.048 GOOG 720.77 100 720.50 720.93
4 2016-05-25 13:30:00.048 GOOG 720.92 100 720.50 720.93
```
Note: As `IntervalType` literal is not supported yet, we have to specify the
`IntervalType` value with `F.expr` as a workaround.
### How was this patch tested?
Added tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]