hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138916077


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, 
Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder,
 StringEncoder}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.StructType
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.4.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 
'catalog.db' in the current
+    // catalog.
+    sparkSession
+      .newDataFrame { builder =>
+        
builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName)
+      }
+      .count()
+  }
+
+  /**
+   * Returns a list of databases (namespaces) available within the current 
catalog.
+   *
+   * @since 3.4.0
+   */
+  override def listDatabases(): Dataset[Database] = {
+    sparkSession.newDataset(CatalogImpl.databaseEncoder) { builder =>
+      builder.getCatalogBuilder.getListDatabasesBuilder
+    }
+  }
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This 
includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the 
name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    sparkSession.newDataset(CatalogImpl.tableEncoder) { builder =>
+      builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of functions registered in the current database 
(namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database 
(namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary 
functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    sparkSession.newDataset(CatalogImpl.functionEncoder) { builder =>
+      builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. 
It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views 
in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view in the specified 
database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use 
`listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] 
= {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder
+        .setTableName(tableName)
+        .setDbName(dbName)
+    }
+  }
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified 
with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  override def getDatabase(dbName: String): Database = {
+    sparkSession
+      .newDataset(CatalogImpl.databaseEncoder) { builder =>
+        builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name. This table can be a 
temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. 
It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views 
in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  override def getTable(tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name in the specified database 
under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` 
with qualified
+   * table/view name instead.
+   *
+   * @since 3.4.0
+   */
+  override def getTable(dbName: String, tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name. This function can be a 
temporary function or a
+   * function. This throws an AnalysisException when the function cannot be 
found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. 
It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then 
functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  override def getFunction(functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        
builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name in the specified database under 
the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use 
`getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified 
database
+   * @since 3.4.0
+   */
+  override def getFunction(dbName: String, functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the 
name can be qualified
+   * with catalog).
+   *
+   * @since 3.4.0
+   */
+  override def databaseExists(dbName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDatabaseExistsBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists. This can 
either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. 
It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views 
in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  override def tableExists(tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists in the 
specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use 
`tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.4.0
+   */
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists. This can either be 
a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. 
It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then 
functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  override def functionExists(functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        
builder.getCatalogBuilder.getFunctionExistsBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists in the specified 
database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use 
`functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.4.0
+   */
+  override def functionExists(dbName: String, functionName: String): Boolean = 
{
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Creates a table from the given path and returns the corresponding 
DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If 
no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.4.0
+   */
+  override def createTable(tableName: String, path: String): DataFrame = {
+    sparkSession.newDataFrame { builder =>

Review Comment:
   This is something that should be done on the planner side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to