naveenp2708 commented on issue #54378:
URL: https://github.com/apache/spark/issues/54378#issuecomment-4002254394

   I've been investigating this issue and traced the root cause to 
KeyGroupedPartitioning.satisfies0() returning true for ClusteredDistribution 
even when partial clustering has split the partition across multiple tasks. 
SPARK-53074 fixed the pre-join case (operators between scan and join), but this 
post-join case remains unaddressed.
   The core problem: after a partially-clustered SPJ join, the output inherits 
KeyGroupedPartitioning which EnsureRequirements considers as satisfying 
ClusteredDistribution. No Exchange is inserted before downstream dedup 
operators (dropDuplicates, Window with row_number), so each split task 
independently deduplicates its subset — producing inflated results.
   I'm working on a fix that adds an isPartiallyClustered flag to 
KeyGroupedPartitioning so that satisfies0(ClusteredDistribution) returns false 
when the partition was split, triggering the necessary Exchange. Plain SPJ 
joins without dedup remain unaffected. Will submit a JIRA and PR shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to