pan3793 commented on code in PR #52995:
URL: https://github.com/apache/spark/pull/52995#discussion_r2516558178
##########
sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala:
##########
@@ -343,4 +343,208 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
}
}
}
+
+ test("SparkConnectDatabaseMetaData getTableTypes") {
+ withConnection { conn =>
+ val metadata = conn.getMetaData
+ Using.resource(metadata.getTableTypes) { rs =>
+ val types = new Iterator[String] {
+ def hasNext: Boolean = rs.next()
+ def next(): String = rs.getString("TABLE_TYPE")
+ }.toSeq
+ // results are ordered by TABLE_TYPE
+ assert(types === Seq("TABLE", "VIEW"))
+ }
+ }
+ }
+
+ test("SparkConnectDatabaseMetaData getTables") {
+
+ case class GetTableResult(
+ TABLE_CAT: String,
+ TABLE_SCHEM: String,
+ TABLE_NAME: String,
+ TABLE_TYPE: String,
+ REMARKS: String,
+ TYPE_CAT: String,
+ TYPE_SCHEM: String,
+ TYPE_NAME: String,
+ SELF_REFERENCING_COL_NAME: String,
+ REF_GENERATION: String)
+
+ def verifyEmptyStringFields(result: GetTableResult): Unit = {
+ assert(result.REMARKS === "")
+ assert(result.TYPE_CAT === "")
+ assert(result.TYPE_SCHEM === "")
+ assert(result.TYPE_NAME === "")
+ assert(result.SELF_REFERENCING_COL_NAME === "")
+ assert(result.REF_GENERATION === "")
+ }
+
+ def verifyGetTables(
+ getTables: () => ResultSet)(verify: Seq[GetTableResult] => Unit): Unit
= {
+ Using.resource(getTables()) { rs =>
+ val getTableResults = new Iterator[GetTableResult] {
+ def hasNext: Boolean = rs.next()
+ def next(): GetTableResult = GetTableResult(
+ TABLE_CAT = rs.getString("TABLE_CAT"),
+ TABLE_SCHEM = rs.getString("TABLE_SCHEM"),
+ TABLE_NAME = rs.getString("TABLE_NAME"),
+ TABLE_TYPE = rs.getString("TABLE_TYPE"),
+ REMARKS = rs.getString("REMARKS"),
+ TYPE_CAT = rs.getString("TYPE_CAT"),
+ TYPE_SCHEM = rs.getString("TYPE_SCHEM"),
+ TYPE_NAME = rs.getString("TYPE_NAME"),
+ SELF_REFERENCING_COL_NAME =
rs.getString("SELF_REFERENCING_COL_NAME"),
+ REF_GENERATION = rs.getString("REF_GENERATION"))
+ }.toSeq
+ verify(getTableResults)
+ }
+ }
+
+ withConnection { conn =>
+ implicit val spark: SparkSession =
conn.asInstanceOf[SparkConnectConnection].spark
+
+ // this catalog does not support view
+ registerCatalog("testcat", TEST_IN_MEMORY_CATALOG)
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS testcat.t_db1")
+ spark.sql("CREATE TABLE IF NOT EXISTS testcat.t_db1.t_t1 (id INT)")
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db1")
+ spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db1.t1 (id INT)")
+ spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db1.t_2 (id INT)")
+ spark.sql(
+ """CREATE VIEW IF NOT EXISTS spark_catalog.db1.t1_v AS
+ |SELECT id FROM spark_catalog.db1.t1
+ |""".stripMargin)
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db_2")
+ spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db_2.t_2 (id INT)")
+ spark.sql(
+ """CREATE VIEW IF NOT EXISTS spark_catalog.db_2.t_2_v AS
+ |SELECT id FROM spark_catalog.db_2.t_2
+ |""".stripMargin)
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db_")
+ spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db_.t_ (id INT)")
+
+ val metadata = conn.getMetaData
+
+ // no need to care about "testcat" because it is memory based and
session isolated,
+ // also is inaccessible from another SparkSession
+ withDatabase("spark_catalog.db1", "spark_catalog.db_2",
"spark_catalog.db_") {
+ // list tables in all catalogs and schemas
+ val getTablesInAllCatalogsAndSchemas = List(null, "%").flatMap {
database =>
+ List(null, "%").flatMap { table =>
+ List(null, Array("TABLE", "VIEW")).map { tableTypes =>
+ () => metadata.getTables(null, database, table, tableTypes)
+ }
+ }
+ }
+
+ getTablesInAllCatalogsAndSchemas.foreach { getTables =>
+ verifyGetTables(getTables) { getTableResults =>
+ // results are ordered by TABLE_TYPE, TABLE_CAT, TABLE_SCHEM and
TABLE_NAME
+ assert {
+ getTableResults.map { result =>
+ (result.TABLE_TYPE, result.TABLE_CAT, result.TABLE_SCHEM,
result.TABLE_NAME)
+ } === Seq(
+ ("TABLE", "spark_catalog", "db1", "t1"),
+ ("TABLE", "spark_catalog", "db1", "t_2"),
+ ("TABLE", "spark_catalog", "db_", "t_"),
+ ("TABLE", "spark_catalog", "db_2", "t_2"),
+ ("TABLE", "testcat", "t_db1", "t_t1"),
+ ("VIEW", "spark_catalog", "db1", "t1_v"),
+ ("VIEW", "spark_catalog", "db_2", "t_2_v"))
+ }
+ getTableResults.foreach(verifyEmptyStringFields)
+ }
+ }
+
+ // list tables with table types
+ val se = intercept[SQLException] {
+ metadata.getTables("spark_catalog", "foo", "bar", Array("TABLE",
"METERIALIZED VIEW"))
Review Comment:
fixed
##########
sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala:
##########
@@ -343,4 +343,208 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
}
}
}
+
+ test("SparkConnectDatabaseMetaData getTableTypes") {
+ withConnection { conn =>
+ val metadata = conn.getMetaData
+ Using.resource(metadata.getTableTypes) { rs =>
+ val types = new Iterator[String] {
+ def hasNext: Boolean = rs.next()
+ def next(): String = rs.getString("TABLE_TYPE")
+ }.toSeq
+ // results are ordered by TABLE_TYPE
+ assert(types === Seq("TABLE", "VIEW"))
+ }
+ }
+ }
+
+ test("SparkConnectDatabaseMetaData getTables") {
+
+ case class GetTableResult(
+ TABLE_CAT: String,
+ TABLE_SCHEM: String,
+ TABLE_NAME: String,
+ TABLE_TYPE: String,
+ REMARKS: String,
+ TYPE_CAT: String,
+ TYPE_SCHEM: String,
+ TYPE_NAME: String,
+ SELF_REFERENCING_COL_NAME: String,
+ REF_GENERATION: String)
+
+ def verifyEmptyStringFields(result: GetTableResult): Unit = {
+ assert(result.REMARKS === "")
+ assert(result.TYPE_CAT === "")
+ assert(result.TYPE_SCHEM === "")
+ assert(result.TYPE_NAME === "")
+ assert(result.SELF_REFERENCING_COL_NAME === "")
+ assert(result.REF_GENERATION === "")
+ }
+
+ def verifyGetTables(
+ getTables: () => ResultSet)(verify: Seq[GetTableResult] => Unit): Unit
= {
+ Using.resource(getTables()) { rs =>
+ val getTableResults = new Iterator[GetTableResult] {
+ def hasNext: Boolean = rs.next()
+ def next(): GetTableResult = GetTableResult(
+ TABLE_CAT = rs.getString("TABLE_CAT"),
+ TABLE_SCHEM = rs.getString("TABLE_SCHEM"),
+ TABLE_NAME = rs.getString("TABLE_NAME"),
+ TABLE_TYPE = rs.getString("TABLE_TYPE"),
+ REMARKS = rs.getString("REMARKS"),
+ TYPE_CAT = rs.getString("TYPE_CAT"),
+ TYPE_SCHEM = rs.getString("TYPE_SCHEM"),
+ TYPE_NAME = rs.getString("TYPE_NAME"),
+ SELF_REFERENCING_COL_NAME =
rs.getString("SELF_REFERENCING_COL_NAME"),
+ REF_GENERATION = rs.getString("REF_GENERATION"))
+ }.toSeq
+ verify(getTableResults)
+ }
+ }
+
+ withConnection { conn =>
+ implicit val spark: SparkSession =
conn.asInstanceOf[SparkConnectConnection].spark
+
+ // this catalog does not support view
+ registerCatalog("testcat", TEST_IN_MEMORY_CATALOG)
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS testcat.t_db1")
+ spark.sql("CREATE TABLE IF NOT EXISTS testcat.t_db1.t_t1 (id INT)")
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db1")
+ spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db1.t1 (id INT)")
+ spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db1.t_2 (id INT)")
+ spark.sql(
+ """CREATE VIEW IF NOT EXISTS spark_catalog.db1.t1_v AS
+ |SELECT id FROM spark_catalog.db1.t1
+ |""".stripMargin)
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db_2")
+ spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db_2.t_2 (id INT)")
+ spark.sql(
+ """CREATE VIEW IF NOT EXISTS spark_catalog.db_2.t_2_v AS
+ |SELECT id FROM spark_catalog.db_2.t_2
+ |""".stripMargin)
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db_")
+ spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db_.t_ (id INT)")
+
+ val metadata = conn.getMetaData
+
+ // no need to care about "testcat" because it is memory based and
session isolated,
+ // also is inaccessible from another SparkSession
+ withDatabase("spark_catalog.db1", "spark_catalog.db_2",
"spark_catalog.db_") {
+ // list tables in all catalogs and schemas
+ val getTablesInAllCatalogsAndSchemas = List(null, "%").flatMap {
database =>
+ List(null, "%").flatMap { table =>
+ List(null, Array("TABLE", "VIEW")).map { tableTypes =>
+ () => metadata.getTables(null, database, table, tableTypes)
+ }
+ }
+ }
+
+ getTablesInAllCatalogsAndSchemas.foreach { getTables =>
+ verifyGetTables(getTables) { getTableResults =>
+ // results are ordered by TABLE_TYPE, TABLE_CAT, TABLE_SCHEM and
TABLE_NAME
+ assert {
+ getTableResults.map { result =>
+ (result.TABLE_TYPE, result.TABLE_CAT, result.TABLE_SCHEM,
result.TABLE_NAME)
+ } === Seq(
+ ("TABLE", "spark_catalog", "db1", "t1"),
+ ("TABLE", "spark_catalog", "db1", "t_2"),
+ ("TABLE", "spark_catalog", "db_", "t_"),
+ ("TABLE", "spark_catalog", "db_2", "t_2"),
+ ("TABLE", "testcat", "t_db1", "t_t1"),
+ ("VIEW", "spark_catalog", "db1", "t1_v"),
+ ("VIEW", "spark_catalog", "db_2", "t_2_v"))
+ }
+ getTableResults.foreach(verifyEmptyStringFields)
+ }
+ }
+
+ // list tables with table types
+ val se = intercept[SQLException] {
+ metadata.getTables("spark_catalog", "foo", "bar", Array("TABLE",
"METERIALIZED VIEW"))
+ }
+ assert(se.getMessage ===
+ "The requested table types contains unsupported items: METERIALIZED
VIEW. " +
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]