fred-db commented on code in PR #38941:
URL: https://github.com/apache/spark/pull/38941#discussion_r1054204094


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala:
##########
@@ -232,4 +247,220 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
       )
     }
   }
+
+  test("SPARK-41498: Metadata column is propagated through union") {
+    withTable(tbl) {
+      prepareTable()
+      val df = spark.table(tbl)
+      val dfQuery = df.union(df).select("id", "data", "index", "_partition")
+      val expectedAnswer = Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), 
Row(3, "c", 0, "1/3"))
+      checkAnswer(dfQuery, expectedAnswer ++ expectedAnswer)
+    }
+  }
+
+  test("SPARK-41498: Nested metadata column is propagated through union") {
+    withTempDir { dir =>
+      spark.range(start = 0, end = 10, step = 1, numPartitions = 1)
+        .write.mode("overwrite").save(dir.getAbsolutePath)
+      val df = spark.read.load(dir.getAbsolutePath)
+      val dfQuery = df.union(df).select("_metadata.file_path")
+
+      val filePath = dir.listFiles(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = 
name.endsWith(".parquet")
+      }).map(_.getAbsolutePath)
+      assert(filePath.length == 1)
+      val expectedAnswer = (1 to 20).map(_ => Row("file:" ++ filePath.head))
+      checkAnswer(dfQuery, expectedAnswer)
+    }
+  }
+
+  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
+    "have metadata output of different size") {
+    withTable(tbl) {
+      prepareTable()
+      withTempDir { dir =>
+        spark.range(start = 10, end = 20).selectExpr("bigint(id) as id", 
"string(id) as data")
+          .write.mode("overwrite").save(dir.getAbsolutePath)
+        val df1 = spark.table(tbl)
+        val df2 = spark.read.load(dir.getAbsolutePath)
+
+        // Make sure one df contains a metadata column and the other does not
+        assert(!df1.queryExecution.analyzed.metadataOutput.exists(_.name == 
"_metadata"))
+        assert(df2.queryExecution.analyzed.metadataOutput.exists(_.name == 
"_metadata"))
+
+        assert(df1.union(df2).queryExecution.analyzed.metadataOutput.isEmpty)
+      }
+    }
+  }
+
+  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
+    "have a type mismatch in a metadata column") {
+    val tbl = "testCatalog.t"
+    val typeMismatchTbl = "typeMismatch.t"
+    withTable(tbl, typeMismatchTbl) {
+      spark.range(10).write.saveAsTable(tbl)
+      val df = spark.table(tbl)
+      spark.range(10).write.saveAsTable(typeMismatchTbl)
+      val typeMismatchDf = spark.table(typeMismatchTbl)
+      
assert(df.union(typeMismatchDf).queryExecution.analyzed.metadataOutput.isEmpty)
+    }
+  }
+
+  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
+    "have an attribute name mismatch in a metadata column") {
+    val tbl = "testCatalog.t"
+    val nameMismatchTbl = "nameMismatch.t"
+    withTable(tbl, nameMismatchTbl) {
+      spark.range(10).write.saveAsTable(tbl)
+      val df = spark.table(tbl)
+      spark.range(10).write.saveAsTable(nameMismatchTbl)
+      val nameMismatchDf = spark.table(nameMismatchTbl)
+      
assert(df.union(nameMismatchDf).queryExecution.analyzed.metadataOutput.isEmpty)
+    }
+  }
+
+  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
+    "have a field name mismatch in a metadata column") {
+    val tbl = "testCatalog.t"
+    val fieldNameMismatchTbl = "fieldNameMismatch.t"
+    withTable(tbl, fieldNameMismatchTbl) {
+      spark.range(10).write.saveAsTable(tbl)
+      val df = spark.table(tbl)
+      spark.range(10).write.saveAsTable(fieldNameMismatchTbl)
+      val fieldNameMismatchDf = spark.table(fieldNameMismatchTbl)
+      
assert(df.union(fieldNameMismatchDf).queryExecution.analyzed.metadataOutput.isEmpty)
+    }
+  }
+}
+
+class MetadataTestTable(
+    name: String,
+    schema: StructType,
+    partitioning: Array[Transform],
+    properties: java.util.Map[String, String])
+  extends InMemoryTable(name, schema, partitioning, properties) {

Review Comment:
   Extending the `InMemoryTable` was necessary as Spark required me to allow 
reads/writes once I started using the `InMemoryCatalog`. Setting it up looked 
difficult, so I decided to just use what's already been built for 
`InMemoryTable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to