cloud-fan commented on a change in pull request #31666:
URL: https://github.com/apache/spark/pull/31666#discussion_r604165763
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -999,41 +999,31 @@ class Analyzer(override val catalogManager:
CatalogManager)
* Adds metadata columns to output for child relations when nodes are
missing resolved attributes.
*
* References to metadata columns are resolved using columns from
[[LogicalPlan.metadataOutput]],
- * but the relation's output does not include the metadata columns until the
relation is replaced
- * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule
adds metadata to the
- * relation's output, the analyzer will detect that nothing produces the
columns.
+ * but the relation's output does not include the metadata columns until the
relation is replaced.
+ * Unless this rule adds metadata to the relation's output, the analyzer
will detect that nothing
+ * produces the columns.
*
* This rule only adds metadata columns when a node is resolved but is
missing input from its
* children. This ensures that metadata columns are not added to the plan
unless they are used. By
* checking only resolved nodes, this ensures that * expansion is already
done so that metadata
- * columns are not accidentally selected by *.
+ * columns are not accidentally selected by *. This rule resolves operators
downwards to avoid
+ * projecting away metadata columns prematurely.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {
- import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
- private def hasMetadataCol(plan: LogicalPlan): Boolean = {
- plan.expressions.exists(_.find {
- case a: Attribute => a.isMetadataCol
- case _ => false
- }.isDefined)
- }
+ import org.apache.spark.sql.catalyst.util._
- private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
- case r: DataSourceV2Relation => r.withMetadataColumns()
- case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
- }
-
- def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
- case node if node.children.nonEmpty && node.resolved &&
hasMetadataCol(node) =>
+ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
+ // Add metadata output to all node types
+ case node if node.children.nonEmpty && node.resolved &&
node.missingInput.nonEmpty &&
Review comment:
let's not call `missingInput` here as it has perf issues, see
https://github.com/apache/spark/pull/31440
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]