WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016174482
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -154,15 +179,24 @@ object UnsupportedOperationChecker extends Logging {
"DataFrames/Datasets")(plan)
}
- // Disallow multiple streaming aggregations
- val aggregates = collectStreamingAggregates(plan)
+ val statefulOps = plan.collect {
+ case p: LogicalPlan if isStatefulOperation(p) => p
+ }
- if (aggregates.size > 1) {
+ if (statefulOps.size > 1 &&
+ outputMode != InternalOutputModes.Append &&
+ SQLConf.get.statefulOperatorCorrectnessCheckEnabled) {
throwError(
- "Multiple streaming aggregations are not supported with " +
- "streaming DataFrames/Datasets")(plan)
+ "Multiple stateful operators are not supported with " +
+ "streaming DataFrames/Datasets for Update and Complete mode. " +
+ "Only Append mode is supported. If you understand the possible risk
of " +
+ "correctness issue and still need to run the query you can " +
+ "disable this check by setting the config " +
+ "`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to
false.")(plan)
Review Comment:
Related tests also see Line 237 - Line 278 in
https://github.com/apache/spark/blob/3637b807d57ff5c534386132fb9d47c7cce72705/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L237-L278
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]