olaky commented on code in PR #40321: URL: https://github.com/apache/spark/pull/40321#discussion_r1134507196
########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala: ########## @@ -281,6 +281,53 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { ) } + metadataColumnsTest("metadata propagates through projections automatically", + schema) { (df, f0, f1) => + + checkAnswer( + df.select("name") + .withColumn("m", col("_metadata").getField("file_name")), + Seq( + Row("jack", f0(METADATA_FILE_NAME)), + Row("lily", f1(METADATA_FILE_NAME)) + ) + ) + } + + metadataColumnsTest("metadata propagates through subqueries only manually", + schema) { (df, f0, f1) => + + // Metadata columns do not automatically propagate through subqueries. + checkError( + exception = intercept[AnalysisException] { + df.select("name").as("s").select("name") + .withColumn("m", col("_metadata").getField("file_name")) + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map("objectName" -> "`_metadata`", "proposal" -> "`s`.`name`")) + + // A metadata column manually propagated through a subquery is available. + checkAnswer( + df.select("name", "_metadata").as("s").select("name") + .withColumn("m", col("_metadata").getField("file_name")), + Seq( + Row("jack", f0(METADATA_FILE_NAME)), + Row("lily", f1(METADATA_FILE_NAME)) + ) + ) + + // A metadata column manually propagated multiple subqueries is available. + checkAnswer( + df.select("name", "_metadata").as("s") + .select("name", "_metadata").as("t").select("name") + .withColumn("m", col("_metadata").getField("file_name")), + Seq( + Row("jack", f0(METADATA_FILE_NAME)), + Row("lily", f1(METADATA_FILE_NAME)) + ) + ) + } + Review Comment: Maybe one case where this is a node between the subquery and the leaf? So maybe df.select("name", "_metadata").as("s").filter(...).select("name", "_metadata").as("t").select("name") ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ########## @@ -1685,14 +1685,13 @@ case class SubqueryAlias( } override def metadataOutput: Seq[Attribute] = { - // Propagate metadata columns from leaf nodes through a chain of `SubqueryAlias`. - if (child.isInstanceOf[LeafNode] || child.isInstanceOf[SubqueryAlias]) { - val qualifierList = identifier.qualifier :+ alias - val nonHiddenMetadataOutput = child.metadataOutput.filter(!_.qualifiedAccessOnly) - nonHiddenMetadataOutput.map(_.withQualifier(qualifierList)) - } else { - Nil - } + // Propagate any metadata column that is already in the child's output. Also propagate + // non-hidden metadata columns from leaf nodes through a chain of `SubqueryAlias`. + val keepChildMetadata = child.isInstanceOf[LeafNode] || child.isInstanceOf[SubqueryAlias] + val qualifierList = identifier.qualifier :+ alias + child.metadataOutput + .filter { a => child.outputSet.contains(a) || (keepChildMetadata && !a.qualifiedAccessOnly) } Review Comment: Thinking out loud: Is it possible that not the attribute, but for example an attribute reference to the attribute is part of the outputSet? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala: ########## @@ -281,6 +281,53 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { ) } + metadataColumnsTest("metadata propagates through projections automatically", + schema) { (df, f0, f1) => + + checkAnswer( + df.select("name") + .withColumn("m", col("_metadata").getField("file_name")), + Seq( + Row("jack", f0(METADATA_FILE_NAME)), + Row("lily", f1(METADATA_FILE_NAME)) + ) + ) + } + + metadataColumnsTest("metadata propagates through subqueries only manually", + schema) { (df, f0, f1) => + + // Metadata columns do not automatically propagate through subqueries. + checkError( + exception = intercept[AnalysisException] { + df.select("name").as("s").select("name") + .withColumn("m", col("_metadata").getField("file_name")) + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map("objectName" -> "`_metadata`", "proposal" -> "`s`.`name`")) + + // A metadata column manually propagated through a subquery is available. + checkAnswer( + df.select("name", "_metadata").as("s").select("name") + .withColumn("m", col("_metadata").getField("file_name")), + Seq( + Row("jack", f0(METADATA_FILE_NAME)), + Row("lily", f1(METADATA_FILE_NAME)) + ) + ) + + // A metadata column manually propagated multiple subqueries is available. Review Comment: ```suggestion // A metadata column manually propagated through multiple subqueries is available. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org