This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f25df1dc25 [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` 
should only be used with new outputs
4f25df1dc25 is described below

commit 4f25df1dc25cc4f002107821cc67e35c1fe0e42c
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Wed Aug 9 19:16:37 2023 +0800

    [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used 
with new outputs
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/41475 . It's 
risky to use `transformUpWithNewOutput` with existing attribute ids. If the 
plan contains duplicated attribute ids somewhere, then we will hit conflicting 
attributes and an assertion error will be thrown by 
`QueryPlan#transformUpWithNewOutput`.
    
    This PR takes a different approach. We canonicalize the plan first and then 
remove the alias-only project. Then we don't need `transformUpWithNewOutput` 
anymore as all attribute ids are normalized in the canonicalized plan.
    
    ### Why are the changes needed?
    
    fix potential bugs
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #42408 from cloud-fan/collect-metrics.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala   | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index fee5660017c..0b953fc2b61 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1080,13 +1080,13 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     def check(plan: LogicalPlan): Unit = plan.foreach { node =>
       node match {
         case metrics @ CollectMetrics(name, _, _) =>
-          val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics)
+          val simplifiedMetrics = 
simplifyPlanForCollectedMetrics(metrics.canonicalized)
           metricsMap.get(name) match {
             case Some(other) =>
-              val simplifiedOther = simplifyPlanForCollectedMetrics(other)
+              val simplifiedOther = 
simplifyPlanForCollectedMetrics(other.canonicalized)
               // Exact duplicates are allowed. They can be the result
               // of a CTE that is used multiple times or a self join.
-              if (!simplifiedMetrics.sameResult(simplifiedOther)) {
+              if (simplifiedMetrics != simplifiedOther) {
                 failAnalysis(
                   errorClass = "DUPLICATED_METRICS_NAME",
                   messageParameters = Map("metricName" -> name))
@@ -1111,7 +1111,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
    * duplicates metric definition.
    */
   private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan 
= {
-    plan.transformUpWithNewOutput {
+    plan.resolveOperators {
       case p: Project if p.projectList.size == p.child.output.size =>
         val assignExprIdOnly = p.projectList.zip(p.child.output).forall {
           case (left: Alias, right: Attribute) =>
@@ -1119,9 +1119,9 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
           case _ => false
         }
         if (assignExprIdOnly) {
-          (p.child, p.output.zip(p.child.output))
+          p.child
         } else {
-          (p, Nil)
+          p
         }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to