HeartSaVioR opened a new pull request #35673:
URL: https://github.com/apache/spark/pull/35673


   ### What changes were proposed in this pull request?
   
   This PR proposes to use StatefulOpClusteredDistribution for stateful 
operators which requires exact order of clustering keys without allowing 
sub-clustering keys, so that stateful operators will have consistent 
partitioning across lifetime of the query. 
   (For sure, it doesn't include the case grouping keys are changed. We have 
state schema checker verifying on the changes, but changing name is allowed so 
swapping keys with same data type is still allowed.)
   
   The change will break the existing queries having checkpoint in prior to 
Spark 3.3 and bring silent correctness issues. To remedy the problem, we 
introduce a new internal config 
`spark.sql.streaming.statefulOperator.useStrictDistribution`, which defaults to 
true for new queries but defaults to false for queries starting from checkpoint 
in prior to Spark 3.3. If the new config is set to false, stateful operator 
will use ClusteredDistribution which retains the old requirement of child 
distribution. 
   
   Note that in this change we don't fix the root problem against old 
checkpoints. Long-term fix should be crafted carefully, after collecting 
evidence on the impact of SPARK-38204. (e.g. how many queries on end users 
would encounter SPARK-38204.)
   
   This PR adds E2E tests for the cases which trigger SPARK-38204, and verify 
the behavior with new query (3.3) & old query (in prior to 3.3).
   
   ### Why are the changes needed?
   
   Please refer the description of JIRA issue 
[SPARK-38024](https://issues.apache.org/jira/browse/SPARK-38204) for details, 
since the description is quite long to include here.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, stateful operators no longer accept the child output partitioning 
having subset of grouping keys and trigger additional shuffle. This will ensure 
consistent partitioning with stateful operators across lifetime of the query.
   
   ### How was this patch tested?
   
   New UTs including backward compatibility are added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to