yadavay-amzn opened a new pull request, #56291:
URL: https://github.com/apache/spark/pull/56291

   ### What changes were proposed in this pull request?
   
   Extends `SegmentTreeWindowFunctionFrame` (introduced in 
[SPARK-56546](https://issues.apache.org/jira/browse/SPARK-56546) for sliding 
aggregates) to also handle **shrinking** frames of the form `... ROWS/RANGE 
BETWEEN <lower> AND UNBOUNDED FOLLOWING`. The class is parameterized with 
`ubound: Option[BoundOrdering]` (`None` = shrinking, `Some(ub)` = sliding) and 
a `fallbackFactory` for the small-partition path so the same machinery (build, 
spill via `TaskMemoryManager`, eligibility allowlist, SQLMetrics) serves both 
shapes.
   
   The dispatcher in `WindowEvaluatorFactoryBase` gains a shrinking-frame 
branch that consults the existing `eligibleForSegTree` gate and, on success, 
builds the unified frame with `ubound = None`.
   
   ### Why are the changes needed?
   
   The legacy `UnboundedFollowingWindowFunctionFrame` recomputes the suffix 
aggregate from scratch for every output row — O(n &middot; (n - 1) / 2). Its 
own scaladoc acknowledges this (`WindowFunctionFrame.scala:636`):
   
   > This is a very expensive operator to use, O(n * (n - 1) / 2), because we 
need to maintain a buffer and must do full recalculation after each row.
   
   The segment tree built by SPARK-56546 already supports arbitrary `[lower, 
upper)` queries; routing shrinking frames into it is purely a dispatch + 
parameter change.
   
   Shrinking frames are common in retention / cohort / "remaining-lifetime" 
analytics: revenue-from-this-row-onward, future-churn-risk-at-this-event, 
monthly-revenue-from-here-forward. For partitions of 100K+ rows (a single 
user's lifetime in a transactional table) the legacy O(N²) path is infeasible.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   - Same opt-in conf: `spark.sql.window.segmentTree.enabled` (default `false`).
   - Same eligibility allowlist (DeclarativeAggregate with `mergeExpressions`, 
no FILTER, no DISTINCT).
   - Same `minPartitionRows` fallback. The fallback type is now 
sliding-dependent: `SlidingWindowFunctionFrame` for moving frames, 
`UnboundedFollowingWindowFunctionFrame` for shrinking frames.
   - No analyzer / SQL grammar / plan-shape changes.
   
   ### How was this patch tested?
   
   **New tests** — `UnboundedFollowingSegmentTreeSuite` (26 tests, all green), 
mirroring `SegmentTreeWindowFunctionSuite`'s structure:
   - Basic aggregates: MIN / MAX / SUM / COUNT / AVG over `BETWEEN CURRENT ROW 
AND UNBOUNDED FOLLOWING`
   - ROWS lower-bound variations: `5 PRECEDING`, `5 FOLLOWING`, both-unbounded, 
multi-aggregate shared frame
   - Partition edge cases: single-row, empty, small-partition fallback 
(`minPartitionRows=1024`)
   - NULL / NaN / Infinity propagation
   - Type coverage: Int / Long / Double / Decimal / String / Date / Timestamp
   - Allowlist fallbacks: `collect_list` (non-DeclarativeAggregate), DISTINCT 
analyzer rejection
   - RANGE frames: uniform, non-uniform gaps, tie groups at lower edge, NULL 
order keys (NULLS FIRST/LAST), INTERVAL Timestamp offsets
   - Feature-flag off: legacy semantics preserved
   
   **Existing tests** — all still green:
   - `SegmentTreeWindowFunctionSuite` (41 sliding tests)
   - `WindowSegmentTreeSuite`, `WindowSegmentTreePropertySuite`, 
`WindowSegmentTreeMemorySuite`, `SegmentTreeWindowMetricsSuite`, 
`WindowSegmentTreeAllowlistSuite` (50 segtree-adjacent tests)
   - `DataFrameWindowFunctionsSuite` (55 high-level window tests)
   
   172 tests total, 0 failures.
   
   **Benchmark** — `UnboundedFollowingWindowBenchmark` on EC2 c5.4xlarge (Intel 
Xeon 8259CL @ 2.50GHz, OpenJDK 17.0.19+10):
   
   Single-partition `SUM(v) OVER (ORDER BY id ROWS BETWEEN CURRENT ROW AND 
UNBOUNDED FOLLOWING)`:
   
   | N    | naive (best) | segtree (best) | speedup |
   |------|-------------:|---------------:|--------:|
   | 5K   | 620 ms       | 73 ms          | 8.5×    |
   | 10K  | 2,471 ms     | 110 ms         | 22.5×   |
   | 25K  | 14,259 ms    | 119 ms         | 119.3×  |
   | 50K  | 57,022 ms    | 181 ms         | 314.2×  |
   | 100K | (~4 min)     | 269 ms         | —       |
   | 200K | (~16 min)    | 480 ms         | —       |
   
   Naive curve is clean O(N²) (5× N → 24× time at 50K vs 10K); segtree is 
sub-linear (2× N at 100K → 200K → 1.8× time, i.e. logarithmic per-row).
   
   Per-aggregate at N=10K:
   
   | Aggregate | naive    | segtree | speedup |
   |-----------|---------:|--------:|--------:|
   | SUM       | 2,471 ms | 110 ms  | 22.5×   |
   | MIN       | 2,417 ms | 215 ms  | 11.2×   |
   | MAX       | 2,396 ms | 228 ms  | 10.5×   |
   | COUNT     | 2,203 ms | 80 ms   | 27.4×   |
   | AVG       | 2,886 ms | 84 ms   | 34.5×   |
   
   Stdev < 5% across all cases. Full benchmark file checked in at 
`sql/core/benchmarks/UnboundedFollowingWindowBenchmark-results.txt`.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes. Authored with assistance from Claude (Anthropic).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to