WweiL commented on code in PR #52642:
URL: https://github.com/apache/spark/pull/52642#discussion_r2460950236
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -344,9 +344,40 @@ class MicroBatchExecution(
setLatestExecutionContext(execCtx)
populateStartOffsets(execCtx, sparkSessionForStream)
+
+ // SPARK-53941: This code path is executed for the first batch, regardless
of whether it's a
+ // fresh new run or restart.
+ disableAQESupportInStatelessIfUnappropriated(sparkSessionForStream)
+
logInfo(log"Stream started from ${MDC(LogKeys.STREAMING_OFFSETS_START,
execCtx.startOffsets)}")
execCtx
}
+
+ private def disableAQESupportInStatelessIfUnappropriated(
+ sparkSessionToRunBatches: SparkSession): Unit = {
+ def containsStatefulOperator(p: LogicalPlan): Boolean = {
+ p.exists {
+ case node: Aggregate if node.isStreaming => true
+ case node: Deduplicate if node.isStreaming => true
+ case node: DeduplicateWithinWatermark if node.isStreaming => true
+ case node: Distinct if node.isStreaming => true
+ case node: Join if node.left.isStreaming && node.right.isStreaming =>
true
+ case node: FlatMapGroupsWithState if node.isStreaming => true
+ case node: FlatMapGroupsInPandasWithState if node.isStreaming => true
+ case node: TransformWithState if node.isStreaming => true
+ case node: TransformWithStateInPySpark if node.isStreaming => true
+ case node: GlobalLimit if node.isStreaming => true
+ case _ => false
+ }
+ }
+
+ if (containsStatefulOperator(analyzedPlan)) {
+ // SPARK-53941: We disable AQE for stateful workloads as of now.
+ logWarning(log"Disabling AQE since AQE is not supported in stateful
workloads.")
+
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key,
"false")
Review Comment:
Do you think it makes sense to make this restriction stronger? In addition
to set this conf there, we also set this AQE disabled as a property of the
query, and perform a conf check every batch. This can prevent users override
this config during foreachBatch and save potential support burden. I can add a
followup if you think it makes sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]