cloud-fan commented on code in PR #49715:
URL: https://github.com/apache/spark/pull/49715#discussion_r1944008785


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -523,15 +516,35 @@ case class AdaptiveSparkPlanExec(
 
   /**
    * This method is called recursively to traverse the plan tree bottom-up and 
create a new query
-   * stage or try reusing an existing stage if the current node is an 
[[Exchange]] node and all of
-   * its child stages have been materialized.
+   * stage or try reusing an existing stage if the current node is an 
[[Exchange]] node or root node
+   * and all of its child stages have been materialized.
    *
    * With each call, it returns:
    * 1) The new plan replaced with [[QueryStageExec]] nodes where new stages 
are created.
    * 2) Whether the child query stages (if any) of the current node have all 
been materialized.
    * 3) A list of the new query stages that have been created.
    */
-  private def createQueryStages(plan: SparkPlan): CreateStageResult = plan 
match {
+  private def createQueryStages(
+      resultHandler: SparkPlan => Any,
+      plan: SparkPlan,
+      firstRun: Boolean): CreateStageResult = plan match {
+    case resultStage@ResultQueryStageExec(_, optimizedPlan, _) =>
+      if (firstRun) {
+        // There is already an existing ResultQueryStage created in previous 
`withFinalPlanUpdate`
+        // e.g, when we do `df.collect` multiple times

Review Comment:
   let's say more about it
   ```
   Here we create a new result stage to execute it again, to avoid caching the 
result and to simulate the behavior of non-AQE.
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -523,15 +516,35 @@ case class AdaptiveSparkPlanExec(
 
   /**
    * This method is called recursively to traverse the plan tree bottom-up and 
create a new query
-   * stage or try reusing an existing stage if the current node is an 
[[Exchange]] node and all of
-   * its child stages have been materialized.
+   * stage or try reusing an existing stage if the current node is an 
[[Exchange]] node or root node
+   * and all of its child stages have been materialized.
    *
    * With each call, it returns:
    * 1) The new plan replaced with [[QueryStageExec]] nodes where new stages 
are created.
    * 2) Whether the child query stages (if any) of the current node have all 
been materialized.
    * 3) A list of the new query stages that have been created.
    */
-  private def createQueryStages(plan: SparkPlan): CreateStageResult = plan 
match {
+  private def createQueryStages(
+      resultHandler: SparkPlan => Any,
+      plan: SparkPlan,
+      firstRun: Boolean): CreateStageResult = plan match {
+    case resultStage@ResultQueryStageExec(_, optimizedPlan, _) =>
+      if (firstRun) {
+        // There is already an existing ResultQueryStage created in previous 
`withFinalPlanUpdate`
+        // e.g, when we do `df.collect` multiple times

Review Comment:
   let's say more about it
   ```
   Here we create a new result stage to execute it again, to avoid caching the 
result
   and to simulate the behavior of non-AQE.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to