andreaschat-db commented on code in PR #55646:
URL: https://github.com/apache/spark/pull/55646#discussion_r3280766608
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -26,27 +26,93 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
/**
- * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered
catalogs, and allow
- * the caller to look up a catalog by name.
+ * A thread-safe contract for managing [[CatalogPlugin]]s. Implementations
resolve catalogs by
+ * name and maintain the current catalog and namespace for a session.
*
* There are still many commands (e.g. ANALYZE TABLE) that do not support v2
catalog API. They
* ignore the current catalog and blindly go to the v1 `SessionCatalog`. To
avoid tracking current
- * namespace in both `SessionCatalog` and `CatalogManger`, we let
`CatalogManager` to set/get
+ * namespace in both `SessionCatalog` and `CatalogManager`, implementations
set/get the
* current database of `SessionCatalog` when the current catalog is the
session catalog.
+ *
+ * Two implementations exist: [[DefaultCatalogManager]] owns the mutable
session state;
+ * [[TransactionAwareCatalogManager]] wraps another manager and redirects
catalog lookups to the
+ * active transaction's catalog.
*/
// TODO: all commands should look up table from the current catalog. The
`SessionCatalog` doesn't
// need to track current database at all.
-private[sql]
-class CatalogManager(
+private[sql] trait CatalogManager extends SQLConfHelper with Logging {
+
+ // ---- Underlying state exposed by implementations ----
+ def defaultSessionCatalog: CatalogPlugin
+ def v1SessionCatalog: SessionCatalog
+ def tempVariableManager: TempVariableManager
+
+ // ---- Catalog access ----
+ def catalog(name: String): CatalogPlugin
+ private[sql] def v2SessionCatalog: CatalogPlugin
+ def listCatalogs(pattern: Option[String]): Seq[String]
+ def currentCatalog: CatalogPlugin
+ def setCurrentCatalog(catalogName: String): Unit
+ def isCatalogRegistered(name: String): Boolean = {
+ try {
+ catalog(name)
+ true
+ } catch {
+ case _: CatalogNotFoundException => false
+ }
+ }
+
+ // ---- Transactions ----
+ def transaction: Option[Transaction] = None
+
+ def withTransaction(transaction: Transaction): CatalogManager
+
+ // ---- Namespace ----
+ def currentNamespace: Array[String]
+ def setCurrentNamespace(namespace: Array[String]): Unit
+
+ // ---- Session path ----
+ def sessionPathEntries: Option[Seq[SessionPathEntry]]
+ def storedSessionPathEntries: Option[Seq[SessionPathEntry]]
+ def confDefaultPathEntries: Option[Seq[SessionPathEntry]]
+ def setSessionPath(entries: Seq[SessionPathEntry]): Unit
+ def clearSessionPath(): Unit
+ private[sql] def copySessionPathFrom(other: CatalogManager): Unit
+ def currentPathString: String
+ def sqlResolutionPathEntries(
+ pathDefaultCatalog: String,
+ pathDefaultNamespace: Seq[String],
+ expandCatalog: String,
+ expandNamespace: Seq[String]): Seq[Seq[String]]
+ def sqlResolutionPathEntries(
+ currentCatalog: String,
+ currentNamespace: Seq[String]): Seq[Seq[String]]
+ def isSystemSessionOnPath: Boolean
+ def resolutionPathEntriesForAnalysis(
+ pinnedEntries: Option[Seq[Seq[String]]],
+ viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]]
+ def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionFunctionKind]
+
+ // Reset the manager to its initial state. Only used in tests.
+ private[sql] def reset(): Unit
+}
+
+/**
+ * Default [[CatalogManager]] implementation. Owns the mutable session state
+ * (registered catalogs, current catalog/namespace, session path).
+ */
+private[sql] class DefaultCatalogManager(
val defaultSessionCatalog: CatalogPlugin,
- val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
+ val v1SessionCatalog: SessionCatalog) extends CatalogManager {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]