HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023907445
When we specify HashClusteredDistribution on stateful operator, there are major assumptions that 1) HashClusteredDistribution creates HashPartitioning and we will never ever change it for the future. 2) We will never ever change the implementation of `partitionIdExpression` in HashPartitioning for the future. 3) No partitioning except HashPartitioning can satisfy HashClusteredDistribution. (I think we may be better to leave code comment for above to prevent the changes against HashClusteredDistribution.) Let's say, the child operator is range partitioned and we add stateful operator with ClusteredDistribution as required distribution. The range partitioning can satisfy ClusteredDistribution but the physical partitioning of the child is totally different with state and it leads correctness issue (even silently). Seems like DataSourcePartitioning doesn't allow the partitioning from data source to be satisfy HashClusteredDistribution - it only checks with ClusteredDistribution. This must not be changed unless the partitioning from data source guarantees the same physical partitioning with Spark's internal hash partitioning, which we don't have any way to guarantee it in the interface of Partitioning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org