yadavay-amzn opened a new pull request, #56291: URL: https://github.com/apache/spark/pull/56291
### What changes were proposed in this pull request? Extends `SegmentTreeWindowFunctionFrame` (introduced in [SPARK-56546](https://issues.apache.org/jira/browse/SPARK-56546) for sliding aggregates) to also handle **shrinking** frames of the form `... ROWS/RANGE BETWEEN <lower> AND UNBOUNDED FOLLOWING`. The class is parameterized with `ubound: Option[BoundOrdering]` (`None` = shrinking, `Some(ub)` = sliding) and a `fallbackFactory` for the small-partition path so the same machinery (build, spill via `TaskMemoryManager`, eligibility allowlist, SQLMetrics) serves both shapes. The dispatcher in `WindowEvaluatorFactoryBase` gains a shrinking-frame branch that consults the existing `eligibleForSegTree` gate and, on success, builds the unified frame with `ubound = None`. ### Why are the changes needed? The legacy `UnboundedFollowingWindowFunctionFrame` recomputes the suffix aggregate from scratch for every output row — O(n · (n - 1) / 2). Its own scaladoc acknowledges this (`WindowFunctionFrame.scala:636`): > This is a very expensive operator to use, O(n * (n - 1) / 2), because we need to maintain a buffer and must do full recalculation after each row. The segment tree built by SPARK-56546 already supports arbitrary `[lower, upper)` queries; routing shrinking frames into it is purely a dispatch + parameter change. Shrinking frames are common in retention / cohort / "remaining-lifetime" analytics: revenue-from-this-row-onward, future-churn-risk-at-this-event, monthly-revenue-from-here-forward. For partitions of 100K+ rows (a single user's lifetime in a transactional table) the legacy O(N²) path is infeasible. ### Does this PR introduce _any_ user-facing change? No. - Same opt-in conf: `spark.sql.window.segmentTree.enabled` (default `false`). - Same eligibility allowlist (DeclarativeAggregate with `mergeExpressions`, no FILTER, no DISTINCT). - Same `minPartitionRows` fallback. The fallback type is now sliding-dependent: `SlidingWindowFunctionFrame` for moving frames, `UnboundedFollowingWindowFunctionFrame` for shrinking frames. - No analyzer / SQL grammar / plan-shape changes. ### How was this patch tested? **New tests** — `UnboundedFollowingSegmentTreeSuite` (26 tests, all green), mirroring `SegmentTreeWindowFunctionSuite`'s structure: - Basic aggregates: MIN / MAX / SUM / COUNT / AVG over `BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING` - ROWS lower-bound variations: `5 PRECEDING`, `5 FOLLOWING`, both-unbounded, multi-aggregate shared frame - Partition edge cases: single-row, empty, small-partition fallback (`minPartitionRows=1024`) - NULL / NaN / Infinity propagation - Type coverage: Int / Long / Double / Decimal / String / Date / Timestamp - Allowlist fallbacks: `collect_list` (non-DeclarativeAggregate), DISTINCT analyzer rejection - RANGE frames: uniform, non-uniform gaps, tie groups at lower edge, NULL order keys (NULLS FIRST/LAST), INTERVAL Timestamp offsets - Feature-flag off: legacy semantics preserved **Existing tests** — all still green: - `SegmentTreeWindowFunctionSuite` (41 sliding tests) - `WindowSegmentTreeSuite`, `WindowSegmentTreePropertySuite`, `WindowSegmentTreeMemorySuite`, `SegmentTreeWindowMetricsSuite`, `WindowSegmentTreeAllowlistSuite` (50 segtree-adjacent tests) - `DataFrameWindowFunctionsSuite` (55 high-level window tests) 172 tests total, 0 failures. **Benchmark** — `UnboundedFollowingWindowBenchmark` on EC2 c5.4xlarge (Intel Xeon 8259CL @ 2.50GHz, OpenJDK 17.0.19+10): Single-partition `SUM(v) OVER (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)`: | N | naive (best) | segtree (best) | speedup | |------|-------------:|---------------:|--------:| | 5K | 620 ms | 73 ms | 8.5× | | 10K | 2,471 ms | 110 ms | 22.5× | | 25K | 14,259 ms | 119 ms | 119.3× | | 50K | 57,022 ms | 181 ms | 314.2× | | 100K | (~4 min) | 269 ms | — | | 200K | (~16 min) | 480 ms | — | Naive curve is clean O(N²) (5× N → 24× time at 50K vs 10K); segtree is sub-linear (2× N at 100K → 200K → 1.8× time, i.e. logarithmic per-row). Per-aggregate at N=10K: | Aggregate | naive | segtree | speedup | |-----------|---------:|--------:|--------:| | SUM | 2,471 ms | 110 ms | 22.5× | | MIN | 2,417 ms | 215 ms | 11.2× | | MAX | 2,396 ms | 228 ms | 10.5× | | COUNT | 2,203 ms | 80 ms | 27.4× | | AVG | 2,886 ms | 84 ms | 34.5× | Stdev < 5% across all cases. Full benchmark file checked in at `sql/core/benchmarks/UnboundedFollowingWindowBenchmark-results.txt`. ### Was this patch authored or co-authored using generative AI tooling? Yes. Authored with assistance from Claude (Anthropic). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
