anirudh83 opened a new pull request, #54021:
URL: https://github.com/apache/spark/pull/54021

    ### What changes were proposed in this pull request?                        
                
                                                                                
             
     Sort the output array in CollectList.eval() and CollectSet.eval() using    
             
     PhysicalDataType.ordering(child.dataType) to ensure deterministic element 
ordering. This
      follows the same pattern used by CollectTopK.eval().                      
             
                                                                                
             
     The change affects two methods in collect.scala:                           
             
     - CollectList.eval(): sorts the buffer array before wrapping in 
GenericArrayData        
     - CollectSet.eval(): sorts the result array before wrapping in 
GenericArrayData         
                                                                                
             
     Golden SQL test result files (group-by.sql.out, 
scalar-subquery-select.sql.out) are     
     updated to reflect the now-sorted output.                                  
             
                                                                                
             
    ###  Why are the changes needed?                                            
                 
                                                                                
             
     When AQE is enabled with 
spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1,   
     array_agg(DISTINCT) produces wrong results in correlated subqueries. The 
root cause:    
                                                                                
             
     1. array_agg(DISTINCT x) resolves to CollectList, with DISTINCT handled by 
adding       
     distinct columns to grouping keys in 
AggUtils.planAggregateWithOneDistinct.             
     2. CollectList.eval() returns elements in buffer insertion order.          
             
     3. The sort-based aggregation fallback in ObjectAggregationIterator merges 
partial      
     buffers, causing non-deterministic element order across runs.              
             
     4. Decorrelation rewrites correlated subqueries into joins 
(DecorrelateInnerQuery) that 
     compare array results for equality.                                        
             
     5. GenericArrayData.equals()/hashCode() are order-sensitive, so arrays 
with the same    
     elements in different orders are treated as unequal, causing the join to 
produce zero   
     matches instead of the correct result.                                     
             
                                                                                
             
     Both collect_list and collect_set explicitly document that their output 
order is        
     non-deterministic, so sorting does not violate their contract.             
             
                                                                                
             
    ### Does this PR introduce any user-facing change?                          
                
                                                                                
             
     Yes. collect_list and collect_set now return elements in sorted order 
instead of        
     insertion order. Both functions already document that their output order 
is             
     non-deterministic and should not be relied upon, so this is a bug fix 
rather than a     
     behavior change in terms of the documented API contract.                   
             
                                                                                
             
     Before: SELECT collect_list(col) FROM VALUES (1), (2), (1) AS tab(col) → 
[1,2,1]        
     After: SELECT collect_list(col) FROM VALUES (1), (2), (1) AS tab(col) → 
[1,1,2]         
                                                                                
             
     ### How was this patch tested?                                             
                 
                                                                                
             
     - Added a regression test in SubquerySuite that verifies 
array_agg(DISTINCT) produces   
     consistent equality results with AQE enabled and 
sortBased.fallbackThreshold=1.         
     - Ran DataFrameAggregateSuite — all 127 tests pass.                        
             
     - Ran group-by.sql and scalar-subquery-select.sql golden SQL tests — all 
pass with      
     updated expected output.                                                   
             
                                                                                
             
    ### Was this patch authored or co-authored using generative AI tooling?     
                
    No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to