HeartSaVioR commented on code in PR #52642:
URL: https://github.com/apache/spark/pull/52642#discussion_r2462473303
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -344,9 +344,40 @@ class MicroBatchExecution(
setLatestExecutionContext(execCtx)
populateStartOffsets(execCtx, sparkSessionForStream)
+
+ // SPARK-53941: This code path is executed for the first batch, regardless
of whether it's a
+ // fresh new run or restart.
+ disableAQESupportInStatelessIfUnappropriated(sparkSessionForStream)
+
logInfo(log"Stream started from ${MDC(LogKeys.STREAMING_OFFSETS_START,
execCtx.startOffsets)}")
execCtx
}
+
+ private def disableAQESupportInStatelessIfUnappropriated(
+ sparkSessionToRunBatches: SparkSession): Unit = {
+ def containsStatefulOperator(p: LogicalPlan): Boolean = {
+ p.exists {
+ case node: Aggregate if node.isStreaming => true
+ case node: Deduplicate if node.isStreaming => true
+ case node: DeduplicateWithinWatermark if node.isStreaming => true
+ case node: Distinct if node.isStreaming => true
+ case node: Join if node.left.isStreaming && node.right.isStreaming =>
true
+ case node: FlatMapGroupsWithState if node.isStreaming => true
+ case node: FlatMapGroupsInPandasWithState if node.isStreaming => true
+ case node: TransformWithState if node.isStreaming => true
+ case node: TransformWithStateInPySpark if node.isStreaming => true
+ case node: GlobalLimit if node.isStreaming => true
+ case _ => false
+ }
+ }
+
+ if (containsStatefulOperator(analyzedPlan)) {
+ // SPARK-53941: We disable AQE for stateful workloads as of now.
+ logWarning(log"Disabling AQE since AQE is not supported in stateful
workloads.")
+
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key,
"false")
Review Comment:
While I think this direction is great in general, it does not actually
prevent the issue completely, because we will check the config change after the
change is applied for a single batch.
If we want to prevent the issue holistically, we should actually disallow
(either ignore with logging, or even fail the query) setting the configs under
streaming engine's control in FEB sink. This may need more thought since when
user function is executed it's not under streaming engine's control. For
example, spark session from given DataFrame in user function should not allow
changing a list of configs under streaming engine's control.
But even with the above, we can't prevent the case of referencing external
spark session in user function. That's more of a fundamental issue, so it'd
probably be fine if the solution does not address this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]