olaky commented on code in PR #40321:
URL: https://github.com/apache/spark/pull/40321#discussion_r1134507196


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -281,6 +281,53 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
     )
   }
 
+  metadataColumnsTest("metadata propagates through projections automatically",
+    schema) { (df, f0, f1) =>
+
+    checkAnswer(
+      df.select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata propagates through subqueries only manually",
+    schema) { (df, f0, f1) =>
+
+    // Metadata columns do not automatically propagate through subqueries.
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select("name").as("s").select("name")
+          .withColumn("m", col("_metadata").getField("file_name"))
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`_metadata`", "proposal" -> 
"`s`.`name`"))
+
+    // A metadata column manually propagated through a subquery is available.
+    checkAnswer(
+      df.select("name", "_metadata").as("s").select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+
+    // A metadata column manually propagated multiple subqueries is available.
+    checkAnswer(
+      df.select("name", "_metadata").as("s")
+        .select("name", "_metadata").as("t").select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+

Review Comment:
   Maybe one case where this is a node between the subquery and the leaf? So 
maybe
   
   df.select("name", "_metadata").as("s").filter(...).select("name", 
"_metadata").as("t").select("name")



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -1685,14 +1685,13 @@ case class SubqueryAlias(
   }
 
   override def metadataOutput: Seq[Attribute] = {
-    // Propagate metadata columns from leaf nodes through a chain of 
`SubqueryAlias`.
-    if (child.isInstanceOf[LeafNode] || child.isInstanceOf[SubqueryAlias]) {
-      val qualifierList = identifier.qualifier :+ alias
-      val nonHiddenMetadataOutput = 
child.metadataOutput.filter(!_.qualifiedAccessOnly)
-      nonHiddenMetadataOutput.map(_.withQualifier(qualifierList))
-    } else {
-      Nil
-    }
+    // Propagate any metadata column that is already in the child's output. 
Also propagate
+    // non-hidden metadata columns from leaf nodes through a chain of 
`SubqueryAlias`.
+    val keepChildMetadata = child.isInstanceOf[LeafNode] || 
child.isInstanceOf[SubqueryAlias]
+    val qualifierList = identifier.qualifier :+ alias
+    child.metadataOutput
+      .filter { a => child.outputSet.contains(a) || (keepChildMetadata && 
!a.qualifiedAccessOnly) }

Review Comment:
   Thinking out loud: Is it possible that not the attribute, but for example an 
attribute reference to the attribute is part of the outputSet?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -281,6 +281,53 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
     )
   }
 
+  metadataColumnsTest("metadata propagates through projections automatically",
+    schema) { (df, f0, f1) =>
+
+    checkAnswer(
+      df.select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata propagates through subqueries only manually",
+    schema) { (df, f0, f1) =>
+
+    // Metadata columns do not automatically propagate through subqueries.
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select("name").as("s").select("name")
+          .withColumn("m", col("_metadata").getField("file_name"))
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`_metadata`", "proposal" -> 
"`s`.`name`"))
+
+    // A metadata column manually propagated through a subquery is available.
+    checkAnswer(
+      df.select("name", "_metadata").as("s").select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+
+    // A metadata column manually propagated multiple subqueries is available.

Review Comment:
   ```suggestion
       // A metadata column manually propagated through multiple subqueries is 
available.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to