StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1142444491


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -119,17 +155,40 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    * @param plan Input query plan to process
    * @param startOperatorID The start value of operation id. The subsequent 
operations will be
    *                        assigned higher value.
+   * @param visited A unique set of operators visited by generateOperatorIds. 
The set is scoped
+   *                at the callsite function processPlan. It serves two 
purpose: Firstly, it is
+   *                used to avoid accidentally overwriting existing IDs that 
were generated in
+   *                the same processPlan call. Secondly, it is used to allow 
for intentional ID
+   *                overwriting as part of SPARK-42753 where an Adaptively 
Optimized Out Exchange
+   *                and its subtree may contain IDs that were generated in a 
previous AQE
+   *                iteration's processPlan call which would result in 
incorrect IDs.
+   * @param reusedExchanges A unique set of ReusedExchange nodes visited which 
will be used to
+   *                        idenitfy adaptively optimized out exchanges in 
SPARK-42753.
+   * @param addReusedExchanges Whether to add ReusedExchange nodes to 
reusedExchanges set. We set it
+   *                           to false to avoid processing more nested 
ReusedExchanges nodes in the
+   *                           subtree of an Adpatively Optimized Out Exchange.
    * @return The last generated operation id for this input plan. This is to 
ensure we always
    *         assign incrementing unique id to each operator.
    */
-  private def generateOperatorIDs(plan: QueryPlan[_], startOperatorID: Int): 
Int = {
+  private def generateOperatorIDs(
+      plan: QueryPlan[_],
+      startOperatorID: Int,
+      visited: Set[QueryPlan[_]],
+      reusedExchanges: ArrayBuffer[ReusedExchangeExec],
+      addReusedExchanges: Boolean): Int = {
     var currentOperationID = startOperatorID
     // Skip the subqueries as they are not printed as part of main query block.
     if (plan.isInstanceOf[BaseSubqueryExec]) {
       return currentOperationID
     }
 
-    def setOpId(plan: QueryPlan[_]): Unit = if 
(plan.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {

Review Comment:
   I'm removing the check "if OP_ID_TAG is empty" because we are allowing 
overwriting the OP_ID_TAG since in some "Optimized Out Exchange" it may contain 
nodes that have "OP_ID_TAG" generated from previous `processPlan` call in a 
previous AQE iteration. Therefore the "OP_ID_TAG" would be incorrect and needs 
overwriting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to