cloud-fan commented on code in PR #38941:
URL: https://github.com/apache/spark/pull/38941#discussion_r1052282096


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala:
##########
@@ -232,4 +239,191 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
       )
     }
   }
+
+  test("SPARK-41498: Metadata column is propagated through union") {
+    withTable(tbl) {
+      prepareTable()
+      val df = spark.table(tbl)
+      val dfQuery = df.union(df).select("id", "data", "index", "_partition")
+      val expectedAnswer = Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), 
Row(3, "c", 0, "1/3"))
+      checkAnswer(dfQuery, expectedAnswer ++ expectedAnswer)
+    }
+  }
+
+  test("SPARK-41498: Nested metadata column is propagated through union") {
+    withTempDir { dir =>
+      spark.range(start = 0, end = 10, step = 1, numPartitions = 1)
+        .write.mode("overwrite").save(dir.getAbsolutePath)
+      val df = spark.read.load(dir.getAbsolutePath)
+      val dfQuery = df.union(df).select("_metadata.file_path")
+
+      val filePath = dir.listFiles(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = 
name.endsWith(".parquet")
+      }).map(_.getAbsolutePath)
+      assert(filePath.length == 1)
+      val expectedAnswer = (1 to 20).map(_ => Row("file:" ++ filePath.head))
+      checkAnswer(dfQuery, expectedAnswer)
+    }
+  }
+
+  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
+    "have metadata output of different size") {
+    withTable(tbl) {
+      prepareTable()
+      withTempDir { dir =>
+        spark.range(start = 10, end = 20).selectExpr("bigint(id) as id", 
"string(id) as data")
+          .write.mode("overwrite").save(dir.getAbsolutePath)
+        val df1 = spark.table(tbl)
+        val df2 = spark.read.load(dir.getAbsolutePath)
+
+        // Make sure one df contains a metadata column and the other does not
+        assert(!df1.queryExecution.analyzed.metadataOutput.exists(_.name == 
"_metadata"))
+        assert(df2.queryExecution.analyzed.metadataOutput.exists(_.name == 
"_metadata"))
+
+        assert(df1.union(df2).queryExecution.analyzed.metadataOutput.isEmpty)
+      }
+    }
+  }
+}
+
+class MetadataTestTable extends Table with SupportsMetadataColumns {
+
+  override def name(): String = "fake"
+  override def schema(): StructType = StructType(Nil)
+  override def capabilities(): java.util.Set[TableCapability] =
+    java.util.EnumSet.of(TableCapability.BATCH_READ)
+  override def metadataColumns(): Array[MetadataColumn] =
+    Array(
+      new MetadataColumn {
+        override def name: String = "_metadata"
+        override def dataType: DataType = StructType(StructField("index", 
IntegerType) :: Nil)
+        override def comment: String = ""
+      }
+    )
+}
+
+class TypeMismatchTable extends MetadataTestTable {
+
+  override def metadataColumns(): Array[MetadataColumn] =
+    Array(
+      new MetadataColumn {
+        override def name: String = "_metadata"
+        override def dataType: DataType = StructType(StructField("index", 
StringType) :: Nil)
+        override def comment: String =
+          "Used to create a type mismatch with the metadata col in 
`MetadataTestTable`"
+      }
+    )
+}
+
+class AttrNameMismatchTable extends MetadataTestTable {
+  override def metadataColumns(): Array[MetadataColumn] =
+    Array(
+      new MetadataColumn {
+        override def name: String = "wrongName"
+        override def dataType: DataType = StructType(StructField("index", 
IntegerType) :: Nil)
+        override def comment: String =
+          "Used to create a name mismatch with the metadata col in 
`MetadataTestTable`"
+      })
+}
+
+class FieldNameMismatchTable extends MetadataTestTable {
+  override def metadataColumns(): Array[MetadataColumn] =
+    Array(
+      new MetadataColumn {
+        override def name: String = "_metadata"
+        override def dataType: DataType = StructType(StructField("wrongName", 
IntegerType) :: Nil)
+        override def comment: String =
+          "Used to create a name mismatch with the struct field in the 
metadata col of " +
+            "`MetadataTestTable`"
+      })
+}
+
+class MetadataTestCatalog extends TestV2SessionCatalogBase[MetadataTestTable] {
+  override protected def newTable(
+    name: String,
+    schema: StructType,
+    partitions: Array[Transform],
+    properties: java.util.Map[String, String]): MetadataTestTable = {
+    new MetadataTestTable
+  }
+}
+
+class MetadataTypeMismatchCatalog extends 
TestV2SessionCatalogBase[TypeMismatchTable] {
+  override protected def newTable(
+    name: String,
+    schema: StructType,
+    partitions: Array[Transform],
+    properties: java.util.Map[String, String]): TypeMismatchTable = {
+    new TypeMismatchTable
+  }
+}
+
+class MetadataAttrNameMismatchCatalog extends 
TestV2SessionCatalogBase[AttrNameMismatchTable] {
+  override protected def newTable(
+    name: String,
+    schema: StructType,
+    partitions: Array[Transform],
+    properties: java.util.Map[String, String]): AttrNameMismatchTable = {
+    new AttrNameMismatchTable
+  }
+}
+
+class MetadataFieldNameMismatchCatalog extends 
TestV2SessionCatalogBase[FieldNameMismatchTable] {
+  override protected def newTable(
+    name: String,
+    schema: StructType,
+    partitions: Array[Transform],
+    properties: java.util.Map[String, String]): FieldNameMismatchTable = {
+    new FieldNameMismatchTable
+  }
+}
+
+class MetadataColumnMismatchSuite extends QueryTest with SharedSparkSession {

Review Comment:
   We don't need to add a new test suite. We can just register the newly 
created catalogs with different names, by adding a `beforeAll` in 
`MetadataColumnSuite`
   ```
   override def beforeAll = {
     spark.conf.set("spark.sql.catalog.typeMismatch", 
classOf[MetadataTypeMismatchCatalog].getName)
     ...
     super.beforeAll()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to