LuciferYang commented on code in PR #52995:
URL: https://github.com/apache/spark/pull/52995#discussion_r2516512606


##########
sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala:
##########
@@ -355,20 +356,124 @@ class SparkConnectDatabaseMetaData(conn: 
SparkConnectConnection) extends Databas
   override def getSchemas(catalog: String, schemaPattern: String): ResultSet = 
{
     conn.checkOpen()
 
-    val df = getSchemasDataFrame(catalog, schemaPattern)
+    val df = getSchemasDataFrame(catalog, Some(schemaPattern))
       .orderBy("TABLE_CATALOG", "TABLE_SCHEM")
     new SparkConnectResultSet(df.collectResult())
   }
 
-  override def getTableTypes: ResultSet =
-    throw new SQLFeatureNotSupportedException
+  override def getTableTypes: ResultSet = {
+    conn.checkOpen()
+
+    val df = TABLE_TYPES.toDF("TABLE_TYPE")
+      .orderBy("TABLE_TYPE")
+    new SparkConnectResultSet(df.collectResult())
+  }
+
+  // Schema of the returned DataFrame is:
+  // |-- TABLE_CAT: string (nullable = false)
+  // |-- TABLE_SCHEM: string (nullable = false)
+  // |-- TABLE_NAME: string (nullable = false)
+  // |-- TABLE_TYPE: string (nullable = false)
+  // |-- REMARKS: string (nullable = false)
+  // |-- TYPE_CAT: string (nullable = false)
+  // |-- TYPE_SCHEM: string (nullable = false)
+  // |-- TYPE_NAME: string (nullable = false)
+  // |-- SELF_REFERENCING_COL_NAME: string (nullable = false)
+  // |-- REF_GENERATION: string (nullable = false)
+  private def getTablesDataFrame(
+      catalog: String,
+      schemaPattern: String,
+      tableNamePattern: String): connect.DataFrame = {
+
+    val catalogSchemasDf = if (schemaPattern == "") {

Review Comment:
   Can we ensure that the `schemaPattern` here is not `null`?



##########
sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala:
##########
@@ -343,4 +343,208 @@ class SparkConnectDatabaseMetaDataSuite extends 
ConnectFunSuite with RemoteSpark
       }
     }
   }
+
+  test("SparkConnectDatabaseMetaData getTableTypes") {
+    withConnection { conn =>
+      val metadata = conn.getMetaData
+      Using.resource(metadata.getTableTypes) { rs =>
+        val types = new Iterator[String] {
+          def hasNext: Boolean = rs.next()
+          def next(): String = rs.getString("TABLE_TYPE")
+        }.toSeq
+        // results are ordered by TABLE_TYPE
+        assert(types === Seq("TABLE", "VIEW"))
+      }
+    }
+  }
+
+  test("SparkConnectDatabaseMetaData getTables") {
+
+    case class GetTableResult(
+       TABLE_CAT: String,
+       TABLE_SCHEM: String,
+       TABLE_NAME: String,
+       TABLE_TYPE: String,
+       REMARKS: String,
+       TYPE_CAT: String,
+       TYPE_SCHEM: String,
+       TYPE_NAME: String,
+       SELF_REFERENCING_COL_NAME: String,
+       REF_GENERATION: String)
+
+    def verifyEmptyStringFields(result: GetTableResult): Unit = {
+      assert(result.REMARKS === "")
+      assert(result.TYPE_CAT === "")
+      assert(result.TYPE_SCHEM === "")
+      assert(result.TYPE_NAME === "")
+      assert(result.SELF_REFERENCING_COL_NAME === "")
+      assert(result.REF_GENERATION === "")
+    }
+
+    def verifyGetTables(
+        getTables: () => ResultSet)(verify: Seq[GetTableResult] => Unit): Unit 
= {
+      Using.resource(getTables()) { rs =>
+        val getTableResults = new Iterator[GetTableResult] {
+          def hasNext: Boolean = rs.next()
+          def next(): GetTableResult = GetTableResult(
+            TABLE_CAT = rs.getString("TABLE_CAT"),
+            TABLE_SCHEM = rs.getString("TABLE_SCHEM"),
+            TABLE_NAME = rs.getString("TABLE_NAME"),
+            TABLE_TYPE = rs.getString("TABLE_TYPE"),
+            REMARKS = rs.getString("REMARKS"),
+            TYPE_CAT = rs.getString("TYPE_CAT"),
+            TYPE_SCHEM = rs.getString("TYPE_SCHEM"),
+            TYPE_NAME = rs.getString("TYPE_NAME"),
+            SELF_REFERENCING_COL_NAME = 
rs.getString("SELF_REFERENCING_COL_NAME"),
+            REF_GENERATION = rs.getString("REF_GENERATION"))
+        }.toSeq
+        verify(getTableResults)
+      }
+    }
+
+    withConnection { conn =>
+      implicit val spark: SparkSession = 
conn.asInstanceOf[SparkConnectConnection].spark
+
+      // this catalog does not support view
+      registerCatalog("testcat", TEST_IN_MEMORY_CATALOG)
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS testcat.t_db1")
+      spark.sql("CREATE TABLE IF NOT EXISTS testcat.t_db1.t_t1 (id INT)")
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db1")
+      spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db1.t1 (id INT)")
+      spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db1.t_2 (id INT)")
+      spark.sql(
+        """CREATE VIEW IF NOT EXISTS spark_catalog.db1.t1_v AS
+          |SELECT id FROM spark_catalog.db1.t1
+          |""".stripMargin)
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db_2")
+      spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db_2.t_2 (id INT)")
+      spark.sql(
+        """CREATE VIEW IF NOT EXISTS spark_catalog.db_2.t_2_v AS
+          |SELECT id FROM spark_catalog.db_2.t_2
+          |""".stripMargin)
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db_")
+      spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db_.t_ (id INT)")
+
+      val metadata = conn.getMetaData
+
+      // no need to care about "testcat" because it is memory based and 
session isolated,
+      // also is inaccessible from another SparkSession
+      withDatabase("spark_catalog.db1", "spark_catalog.db_2", 
"spark_catalog.db_") {
+        // list tables in all catalogs and schemas
+        val getTablesInAllCatalogsAndSchemas = List(null, "%").flatMap { 
database =>
+          List(null, "%").flatMap { table =>
+            List(null, Array("TABLE", "VIEW")).map { tableTypes =>
+              () => metadata.getTables(null, database, table, tableTypes)
+            }
+          }
+        }
+
+        getTablesInAllCatalogsAndSchemas.foreach { getTables =>
+          verifyGetTables(getTables) { getTableResults =>
+            // results are ordered by TABLE_TYPE, TABLE_CAT, TABLE_SCHEM and 
TABLE_NAME
+            assert {
+              getTableResults.map { result =>
+                (result.TABLE_TYPE, result.TABLE_CAT, result.TABLE_SCHEM, 
result.TABLE_NAME)
+              } === Seq(
+                ("TABLE", "spark_catalog", "db1", "t1"),
+                ("TABLE", "spark_catalog", "db1", "t_2"),
+                ("TABLE", "spark_catalog", "db_", "t_"),
+                ("TABLE", "spark_catalog", "db_2", "t_2"),
+                ("TABLE", "testcat", "t_db1", "t_t1"),
+                ("VIEW", "spark_catalog", "db1", "t1_v"),
+                ("VIEW", "spark_catalog", "db_2", "t_2_v"))
+            }
+            getTableResults.foreach(verifyEmptyStringFields)
+          }
+        }
+
+        // list tables with table types
+        val se = intercept[SQLException] {
+          metadata.getTables("spark_catalog", "foo", "bar", Array("TABLE", 
"METERIALIZED VIEW"))
+        }
+        assert(se.getMessage ===
+          "The requested table types contains unsupported items: METERIALIZED 
VIEW. " +

Review Comment:
   ditto



##########
sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala:
##########
@@ -343,4 +343,208 @@ class SparkConnectDatabaseMetaDataSuite extends 
ConnectFunSuite with RemoteSpark
       }
     }
   }
+
+  test("SparkConnectDatabaseMetaData getTableTypes") {
+    withConnection { conn =>
+      val metadata = conn.getMetaData
+      Using.resource(metadata.getTableTypes) { rs =>
+        val types = new Iterator[String] {
+          def hasNext: Boolean = rs.next()
+          def next(): String = rs.getString("TABLE_TYPE")
+        }.toSeq
+        // results are ordered by TABLE_TYPE
+        assert(types === Seq("TABLE", "VIEW"))
+      }
+    }
+  }
+
+  test("SparkConnectDatabaseMetaData getTables") {
+
+    case class GetTableResult(
+       TABLE_CAT: String,
+       TABLE_SCHEM: String,
+       TABLE_NAME: String,
+       TABLE_TYPE: String,
+       REMARKS: String,
+       TYPE_CAT: String,
+       TYPE_SCHEM: String,
+       TYPE_NAME: String,
+       SELF_REFERENCING_COL_NAME: String,
+       REF_GENERATION: String)
+
+    def verifyEmptyStringFields(result: GetTableResult): Unit = {
+      assert(result.REMARKS === "")
+      assert(result.TYPE_CAT === "")
+      assert(result.TYPE_SCHEM === "")
+      assert(result.TYPE_NAME === "")
+      assert(result.SELF_REFERENCING_COL_NAME === "")
+      assert(result.REF_GENERATION === "")
+    }
+
+    def verifyGetTables(
+        getTables: () => ResultSet)(verify: Seq[GetTableResult] => Unit): Unit 
= {
+      Using.resource(getTables()) { rs =>
+        val getTableResults = new Iterator[GetTableResult] {
+          def hasNext: Boolean = rs.next()
+          def next(): GetTableResult = GetTableResult(
+            TABLE_CAT = rs.getString("TABLE_CAT"),
+            TABLE_SCHEM = rs.getString("TABLE_SCHEM"),
+            TABLE_NAME = rs.getString("TABLE_NAME"),
+            TABLE_TYPE = rs.getString("TABLE_TYPE"),
+            REMARKS = rs.getString("REMARKS"),
+            TYPE_CAT = rs.getString("TYPE_CAT"),
+            TYPE_SCHEM = rs.getString("TYPE_SCHEM"),
+            TYPE_NAME = rs.getString("TYPE_NAME"),
+            SELF_REFERENCING_COL_NAME = 
rs.getString("SELF_REFERENCING_COL_NAME"),
+            REF_GENERATION = rs.getString("REF_GENERATION"))
+        }.toSeq
+        verify(getTableResults)
+      }
+    }
+
+    withConnection { conn =>
+      implicit val spark: SparkSession = 
conn.asInstanceOf[SparkConnectConnection].spark
+
+      // this catalog does not support view
+      registerCatalog("testcat", TEST_IN_MEMORY_CATALOG)
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS testcat.t_db1")
+      spark.sql("CREATE TABLE IF NOT EXISTS testcat.t_db1.t_t1 (id INT)")
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db1")
+      spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db1.t1 (id INT)")
+      spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db1.t_2 (id INT)")
+      spark.sql(
+        """CREATE VIEW IF NOT EXISTS spark_catalog.db1.t1_v AS
+          |SELECT id FROM spark_catalog.db1.t1
+          |""".stripMargin)
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db_2")
+      spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db_2.t_2 (id INT)")
+      spark.sql(
+        """CREATE VIEW IF NOT EXISTS spark_catalog.db_2.t_2_v AS
+          |SELECT id FROM spark_catalog.db_2.t_2
+          |""".stripMargin)
+
+      spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db_")
+      spark.sql("CREATE TABLE IF NOT EXISTS spark_catalog.db_.t_ (id INT)")
+
+      val metadata = conn.getMetaData
+
+      // no need to care about "testcat" because it is memory based and 
session isolated,
+      // also is inaccessible from another SparkSession
+      withDatabase("spark_catalog.db1", "spark_catalog.db_2", 
"spark_catalog.db_") {
+        // list tables in all catalogs and schemas
+        val getTablesInAllCatalogsAndSchemas = List(null, "%").flatMap { 
database =>
+          List(null, "%").flatMap { table =>
+            List(null, Array("TABLE", "VIEW")).map { tableTypes =>
+              () => metadata.getTables(null, database, table, tableTypes)
+            }
+          }
+        }
+
+        getTablesInAllCatalogsAndSchemas.foreach { getTables =>
+          verifyGetTables(getTables) { getTableResults =>
+            // results are ordered by TABLE_TYPE, TABLE_CAT, TABLE_SCHEM and 
TABLE_NAME
+            assert {
+              getTableResults.map { result =>
+                (result.TABLE_TYPE, result.TABLE_CAT, result.TABLE_SCHEM, 
result.TABLE_NAME)
+              } === Seq(
+                ("TABLE", "spark_catalog", "db1", "t1"),
+                ("TABLE", "spark_catalog", "db1", "t_2"),
+                ("TABLE", "spark_catalog", "db_", "t_"),
+                ("TABLE", "spark_catalog", "db_2", "t_2"),
+                ("TABLE", "testcat", "t_db1", "t_t1"),
+                ("VIEW", "spark_catalog", "db1", "t1_v"),
+                ("VIEW", "spark_catalog", "db_2", "t_2_v"))
+            }
+            getTableResults.foreach(verifyEmptyStringFields)
+          }
+        }
+
+        // list tables with table types
+        val se = intercept[SQLException] {
+          metadata.getTables("spark_catalog", "foo", "bar", Array("TABLE", 
"METERIALIZED VIEW"))

Review Comment:
   MATERIALIZED ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to