cloud-fan commented on code in PR #49715:
URL: https://github.com/apache/spark/pull/49715#discussion_r1944008785
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -523,15 +516,35 @@ case class AdaptiveSparkPlanExec(
/**
* This method is called recursively to traverse the plan tree bottom-up and
create a new query
- * stage or try reusing an existing stage if the current node is an
[[Exchange]] node and all of
- * its child stages have been materialized.
+ * stage or try reusing an existing stage if the current node is an
[[Exchange]] node or root node
+ * and all of its child stages have been materialized.
*
* With each call, it returns:
* 1) The new plan replaced with [[QueryStageExec]] nodes where new stages
are created.
* 2) Whether the child query stages (if any) of the current node have all
been materialized.
* 3) A list of the new query stages that have been created.
*/
- private def createQueryStages(plan: SparkPlan): CreateStageResult = plan
match {
+ private def createQueryStages(
+ resultHandler: SparkPlan => Any,
+ plan: SparkPlan,
+ firstRun: Boolean): CreateStageResult = plan match {
+ case resultStage@ResultQueryStageExec(_, optimizedPlan, _) =>
+ if (firstRun) {
+ // There is already an existing ResultQueryStage created in previous
`withFinalPlanUpdate`
+ // e.g, when we do `df.collect` multiple times
Review Comment:
let's say more about it
```
Here we create a new result stage to execute it again, to avoid caching the
result and to simulate the behavior of non-AQE.
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -523,15 +516,35 @@ case class AdaptiveSparkPlanExec(
/**
* This method is called recursively to traverse the plan tree bottom-up and
create a new query
- * stage or try reusing an existing stage if the current node is an
[[Exchange]] node and all of
- * its child stages have been materialized.
+ * stage or try reusing an existing stage if the current node is an
[[Exchange]] node or root node
+ * and all of its child stages have been materialized.
*
* With each call, it returns:
* 1) The new plan replaced with [[QueryStageExec]] nodes where new stages
are created.
* 2) Whether the child query stages (if any) of the current node have all
been materialized.
* 3) A list of the new query stages that have been created.
*/
- private def createQueryStages(plan: SparkPlan): CreateStageResult = plan
match {
+ private def createQueryStages(
+ resultHandler: SparkPlan => Any,
+ plan: SparkPlan,
+ firstRun: Boolean): CreateStageResult = plan match {
+ case resultStage@ResultQueryStageExec(_, optimizedPlan, _) =>
+ if (firstRun) {
+ // There is already an existing ResultQueryStage created in previous
`withFinalPlanUpdate`
+ // e.g, when we do `df.collect` multiple times
Review Comment:
let's say more about it
```
Here we create a new result stage to execute it again, to avoid caching the
result
and to simulate the behavior of non-AQE.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]