sarutak opened a new pull request, #55912:
URL: https://github.com/apache/spark/pull/55912

   ### What changes were proposed in this pull request?
   Add `SortMergeAsOfJoinExec`, a dedicated physical operator for AS-OF joins 
that replaces the existing correlated subquery rewrite (`RewriteAsOfJoin`) when 
`spark.sql.join.sortMergeAsOfJoin.enabled` is set to `true` (default `false`).
   
   The operator co-partitions both sides by equi-join keys, sorts by 
(equi-keys, as-of key), and performs a single-pass merge scan per partition to 
find the nearest match for each left row. It exploits sort order for early 
termination by scanning in the optimal direction based on the join direction 
(right-to-left for backward, left-to-right for forward/nearest).
   
   Changes:
   - New physical operator: `SortMergeAsOfJoinExec`
   - New planner strategy: `AsOfJoinSelection` in `SparkStrategies`
   - Conditional skip of `RewriteAsOfJoin` when the conf is enabled
   - New SQLConf: `spark.sql.join.sortMergeAsOfJoin.enabled`
   
   ### Why are the changes needed?
   The current AS-OF join implementation rewrites `AsOfJoin` to a correlated 
scalar subquery with `MIN_BY`. This approach is O(N×M) per partition and causes 
OOM on moderate data sizes (100K+ rows), because the inequality condition 
(`left.t >= right.t`) cannot be decorrelated into an equi-join.
   
   The sort-merge operator is O(N+M) per partition after sorting, with early 
termination within each equi-key group.
   
   Benchmark results on GitHub Actions (AMD EPYC 7763, 10K×10K rows, 100 
equi-key groups):
   
   | | JDK 17 | JDK 21 | JDK 25 |
   |---|---|---|---|
   | With equi-key | 631.8× | 601.2× | 676.5× |
   | Without equi-key | 14.0× | 13.3× | 13.7× |
   
   For 100K×100K rows, the baseline OOMs while the sort-merge operator 
completes in ~500 ms.
   
   ### Does this PR introduce _any_ user-facing change?
   No. The feature is opt-in via a new SQLConf that defaults to `false`. When 
disabled, the existing `RewriteAsOfJoin` path is used unchanged.
   
   ### How was this patch tested?
   - `SortMergeAsOfJoinSuite`: 18 tests covering backward/forward/nearest 
directions, equi-keys, left outer, tolerance, allowExactMatches=false, empty 
partitions, null keys, multiple data types (Int/Long/Double), self join, no 
equi-key, and conf-disabled fallback
   - `AsOfJoinBenchmark`: comparative benchmark (correlated subquery vs 
sort-merge)
   - Existing `DataFrameAsOfJoinSuite`: all 11 tests pass with default conf (no 
regression)
   
   ### Was this patch authored or co-authored using generative AI tooling?
   Generated-by: Claude (via Kiro CLI, auto model selection)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to