cloud-fan commented on a change in pull request #31666:
URL: https://github.com/apache/spark/pull/31666#discussion_r604165763



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -999,41 +999,31 @@ class Analyzer(override val catalogManager: 
CatalogManager)
    * Adds metadata columns to output for child relations when nodes are 
missing resolved attributes.
    *
    * References to metadata columns are resolved using columns from 
[[LogicalPlan.metadataOutput]],
-   * but the relation's output does not include the metadata columns until the 
relation is replaced
-   * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule 
adds metadata to the
-   * relation's output, the analyzer will detect that nothing produces the 
columns.
+   * but the relation's output does not include the metadata columns until the 
relation is replaced.
+   * Unless this rule adds metadata to the relation's output, the analyzer 
will detect that nothing
+   * produces the columns.
    *
    * This rule only adds metadata columns when a node is resolved but is 
missing input from its
    * children. This ensures that metadata columns are not added to the plan 
unless they are used. By
    * checking only resolved nodes, this ensures that * expansion is already 
done so that metadata
-   * columns are not accidentally selected by *.
+   * columns are not accidentally selected by *. This rule resolves operators 
downwards to avoid
+   * projecting away metadata columns prematurely.
    */
   object AddMetadataColumns extends Rule[LogicalPlan] {
-    import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
 
-    private def hasMetadataCol(plan: LogicalPlan): Boolean = {
-      plan.expressions.exists(_.find {
-        case a: Attribute => a.isMetadataCol
-        case _ => false
-      }.isDefined)
-    }
+    import org.apache.spark.sql.catalyst.util._
 
-    private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
-      case r: DataSourceV2Relation => r.withMetadataColumns()
-      case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
-    }
-
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
-      case node if node.children.nonEmpty && node.resolved && 
hasMetadataCol(node) =>
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
+      // Add metadata output to all node types
+      case node if node.children.nonEmpty && node.resolved && 
node.missingInput.nonEmpty &&

Review comment:
       let's not call `missingInput` here as it has perf issues, see 
https://github.com/apache/spark/pull/31440




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to