naveenp2708 commented on issue #54378: URL: https://github.com/apache/spark/issues/54378#issuecomment-4002254394
I've been investigating this issue and traced the root cause to KeyGroupedPartitioning.satisfies0() returning true for ClusteredDistribution even when partial clustering has split the partition across multiple tasks. SPARK-53074 fixed the pre-join case (operators between scan and join), but this post-join case remains unaddressed. The core problem: after a partially-clustered SPJ join, the output inherits KeyGroupedPartitioning which EnsureRequirements considers as satisfying ClusteredDistribution. No Exchange is inserted before downstream dedup operators (dropDuplicates, Window with row_number), so each split task independently deduplicates its subset — producing inflated results. I'm working on a fix that adds an isPartiallyClustered flag to KeyGroupedPartitioning so that satisfies0(ClusteredDistribution) returns false when the partition was split, triggering the necessary Exchange. Plain SPJ joins without dedup remain unaffected. Will submit a JIRA and PR shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
