anirudh83 opened a new pull request, #54021:
URL: https://github.com/apache/spark/pull/54021
### What changes were proposed in this pull request?
Sort the output array in CollectList.eval() and CollectSet.eval() using
PhysicalDataType.ordering(child.dataType) to ensure deterministic element
ordering. This
follows the same pattern used by CollectTopK.eval().
The change affects two methods in collect.scala:
- CollectList.eval(): sorts the buffer array before wrapping in
GenericArrayData
- CollectSet.eval(): sorts the result array before wrapping in
GenericArrayData
Golden SQL test result files (group-by.sql.out,
scalar-subquery-select.sql.out) are
updated to reflect the now-sorted output.
### Why are the changes needed?
When AQE is enabled with
spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1,
array_agg(DISTINCT) produces wrong results in correlated subqueries. The
root cause:
1. array_agg(DISTINCT x) resolves to CollectList, with DISTINCT handled by
adding
distinct columns to grouping keys in
AggUtils.planAggregateWithOneDistinct.
2. CollectList.eval() returns elements in buffer insertion order.
3. The sort-based aggregation fallback in ObjectAggregationIterator merges
partial
buffers, causing non-deterministic element order across runs.
4. Decorrelation rewrites correlated subqueries into joins
(DecorrelateInnerQuery) that
compare array results for equality.
5. GenericArrayData.equals()/hashCode() are order-sensitive, so arrays
with the same
elements in different orders are treated as unequal, causing the join to
produce zero
matches instead of the correct result.
Both collect_list and collect_set explicitly document that their output
order is
non-deterministic, so sorting does not violate their contract.
### Does this PR introduce any user-facing change?
Yes. collect_list and collect_set now return elements in sorted order
instead of
insertion order. Both functions already document that their output order
is
non-deterministic and should not be relied upon, so this is a bug fix
rather than a
behavior change in terms of the documented API contract.
Before: SELECT collect_list(col) FROM VALUES (1), (2), (1) AS tab(col) →
[1,2,1]
After: SELECT collect_list(col) FROM VALUES (1), (2), (1) AS tab(col) →
[1,1,2]
### How was this patch tested?
- Added a regression test in SubquerySuite that verifies
array_agg(DISTINCT) produces
consistent equality results with AQE enabled and
sortBased.fallbackThreshold=1.
- Ran DataFrameAggregateSuite — all 127 tests pass.
- Ran group-by.sql and scalar-subquery-select.sql golden SQL tests — all
pass with
updated expected output.
### Was this patch authored or co-authored using generative AI tooling?
No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]