cloud-fan commented on code in PR #55646:
URL: https://github.com/apache/spark/pull/55646#discussion_r3279910998
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -26,27 +26,93 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
/**
- * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered
catalogs, and allow
- * the caller to look up a catalog by name.
+ * A thread-safe contract for managing [[CatalogPlugin]]s. Implementations
resolve catalogs by
+ * name and maintain the current catalog and namespace for a session.
*
* There are still many commands (e.g. ANALYZE TABLE) that do not support v2
catalog API. They
* ignore the current catalog and blindly go to the v1 `SessionCatalog`. To
avoid tracking current
- * namespace in both `SessionCatalog` and `CatalogManger`, we let
`CatalogManager` to set/get
+ * namespace in both `SessionCatalog` and `CatalogManager`, implementations
set/get the
* current database of `SessionCatalog` when the current catalog is the
session catalog.
+ *
+ * Two implementations exist: [[DefaultCatalogManager]] owns the mutable
session state;
+ * [[TransactionAwareCatalogManager]] wraps another manager and redirects
catalog lookups to the
+ * active transaction's catalog.
*/
// TODO: all commands should look up table from the current catalog. The
`SessionCatalog` doesn't
// need to track current database at all.
-private[sql]
-class CatalogManager(
+private[sql] trait CatalogManager extends SQLConfHelper with Logging {
+
+ // ---- Underlying state exposed by implementations ----
+ def defaultSessionCatalog: CatalogPlugin
+ def v1SessionCatalog: SessionCatalog
+ def tempVariableManager: TempVariableManager
+
+ // ---- Catalog access ----
+ def catalog(name: String): CatalogPlugin
+ private[sql] def v2SessionCatalog: CatalogPlugin
+ def listCatalogs(pattern: Option[String]): Seq[String]
+ def currentCatalog: CatalogPlugin
+ def setCurrentCatalog(catalogName: String): Unit
+ def isCatalogRegistered(name: String): Boolean = {
+ try {
+ catalog(name)
+ true
+ } catch {
+ case _: CatalogNotFoundException => false
+ }
+ }
+
+ // ---- Transactions ----
+ def transaction: Option[Transaction] = None
+
+ def withTransaction(transaction: Transaction): CatalogManager
+
+ // ---- Namespace ----
+ def currentNamespace: Array[String]
+ def setCurrentNamespace(namespace: Array[String]): Unit
+
+ // ---- Session path ----
+ def sessionPathEntries: Option[Seq[SessionPathEntry]]
+ def storedSessionPathEntries: Option[Seq[SessionPathEntry]]
+ def confDefaultPathEntries: Option[Seq[SessionPathEntry]]
+ def setSessionPath(entries: Seq[SessionPathEntry]): Unit
+ def clearSessionPath(): Unit
+ private[sql] def copySessionPathFrom(other: CatalogManager): Unit
+ def currentPathString: String
+ def sqlResolutionPathEntries(
+ pathDefaultCatalog: String,
+ pathDefaultNamespace: Seq[String],
+ expandCatalog: String,
+ expandNamespace: Seq[String]): Seq[Seq[String]]
+ def sqlResolutionPathEntries(
+ currentCatalog: String,
+ currentNamespace: Seq[String]): Seq[Seq[String]]
+ def isSystemSessionOnPath: Boolean
+ def resolutionPathEntriesForAnalysis(
+ pinnedEntries: Option[Seq[Seq[String]]],
+ viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]]
+ def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionFunctionKind]
+
+ // Reset the manager to its initial state. Only used in tests.
+ private[sql] def reset(): Unit
+}
+
+/**
+ * Default [[CatalogManager]] implementation. Owns the mutable session state
+ * (registered catalogs, current catalog/namespace, session path).
+ */
+private[sql] class DefaultCatalogManager(
val defaultSessionCatalog: CatalogPlugin,
- val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
+ val v1SessionCatalog: SessionCatalog) extends CatalogManager {
Review Comment:
The constructor-`val`s here (and `val tempVariableManager` at line 122)
implement abstract `def` members of the trait without the `override` modifier.
Scala allows omitting `override` for abstract overrides, but every other method
in this class uses `override def`, and adding it on these three makes the
implements-from-trait relationship explicit (and prevents accidental
shadow-definition if the abstract is later removed from the trait).
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala:
##########
@@ -18,47 +18,95 @@
package org.apache.spark.sql.connector.catalog
import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.catalog.TempVariableManager
+import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
import org.apache.spark.sql.connector.catalog.transactions.Transaction
/**
* A [[CatalogManager]] decorator that redirects catalog lookups to the
transaction's catalog
* instance when names match, ensuring table loads during analysis are scoped
to the transaction.
- * All mutable state (current catalog, current namespace, loaded catalogs) is
delegated to the
- * wrapped [[CatalogManager]].
+ * All mutable session state is delegated to the wrapped [[CatalogManager]].
*/
-// TODO: Extracting a CatalogManager trait (so this class can implement it
instead of extending
-// CatalogManager) would eliminate the inherited mutable state that this
decorator doesn't use.
private[sql] class TransactionAwareCatalogManager(
delegate: CatalogManager,
- txn: Transaction)
- extends CatalogManager(delegate.defaultSessionCatalog,
delegate.v1SessionCatalog) {
+ txn: Transaction) extends CatalogManager {
- override val tempVariableManager: TempVariableManager =
delegate.tempVariableManager
-
- override def transaction: Option[Transaction] = Some(txn)
-
- override def withTransaction(newTxn: Transaction): CatalogManager =
- throw SparkException.internalError("Cannot nest transactions: a
transaction is already active.")
+ // ---- Underlying state: pure delegation. ----
+ override def defaultSessionCatalog: CatalogPlugin =
delegate.defaultSessionCatalog
+ override def v1SessionCatalog: SessionCatalog = delegate.v1SessionCatalog
+ override def tempVariableManager: TempVariableManager =
delegate.tempVariableManager
+ // ---- Catalog access: redirect to txn catalog when names match. ----
override def catalog(name: String): CatalogPlugin = {
val resolved = delegate.catalog(name)
if (txn.catalog.name() == resolved.name()) txn.catalog else resolved
}
+ override private[sql] def v2SessionCatalog: CatalogPlugin =
delegate.v2SessionCatalog
+
+ override def listCatalogs(pattern: Option[String]): Seq[String] =
+ delegate.listCatalogs(pattern)
+
+ override def transaction: Option[Transaction] = Some(txn)
+
+ override def withTransaction(newTxn: Transaction): CatalogManager =
+ throw SparkException.internalError("Cannot nest transactions: a
transaction is already active.")
+
override def currentCatalog: CatalogPlugin = {
val c = delegate.currentCatalog
if (txn.catalog.name() == c.name()) txn.catalog else c
}
+ override def setCurrentCatalog(catalogName: String): Unit =
+ delegate.setCurrentCatalog(catalogName)
+
override def currentNamespace: Array[String] = delegate.currentNamespace
override def setCurrentNamespace(namespace: Array[String]): Unit =
delegate.setCurrentNamespace(namespace)
- override def setCurrentCatalog(catalogName: String): Unit =
- delegate.setCurrentCatalog(catalogName)
+ override def sessionPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.sessionPathEntries
- override def listCatalogs(pattern: Option[String]): Seq[String] =
- delegate.listCatalogs(pattern)
+ override def storedSessionPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.storedSessionPathEntries
+
+ override def confDefaultPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.confDefaultPathEntries
+
+ override def setSessionPath(entries: Seq[SessionPathEntry]): Unit =
+ delegate.setSessionPath(entries)
+
+ override def clearSessionPath(): Unit = delegate.clearSessionPath()
+
+ override private[sql] def copySessionPathFrom(other: CatalogManager): Unit =
+ delegate.copySessionPathFrom(other)
+
+ override def currentPathString: String = delegate.currentPathString
+
+ override def sqlResolutionPathEntries(
+ pathDefaultCatalog: String,
+ pathDefaultNamespace: Seq[String],
+ expandCatalog: String,
+ expandNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.sqlResolutionPathEntries(
+ pathDefaultCatalog, pathDefaultNamespace, expandCatalog, expandNamespace)
+
+ override def sqlResolutionPathEntries(
+ currentCatalog: String,
+ currentNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.sqlResolutionPathEntries(currentCatalog, currentNamespace)
+
+ override def isSystemSessionOnPath: Boolean = delegate.isSystemSessionOnPath
+
+ override def resolutionPathEntriesForAnalysis(
+ pinnedEntries: Option[Seq[Seq[String]]],
+ viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.resolutionPathEntriesForAnalysis(pinnedEntries,
viewCatalogAndNamespace)
+
+ override def sessionFunctionKindsForUnqualifiedResolution():
Seq[SessionFunctionKind] =
+ delegate.sessionFunctionKindsForUnqualifiedResolution()
+
+ override private[sql] def reset(): Unit = delegate.reset()
Review Comment:
Worth a one-line comment: this is a behavior change versus the pre-refactor
code. The old `reset()` was inherited from `CatalogManager` and cleared TACM's
own (separately initialized) super-state without touching the delegate. The new
override clobbers the underlying `DefaultCatalogManager` — clears its catalogs
map, session path, current catalog/namespace, etc. `reset()` is test-only and
not currently called on a TACM, so this is harmless today, but a future test
reaching for it would silently affect session-level state outside the
transaction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]