Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/17433#discussion_r108115663
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -179,88 +132,295 @@ private[sql] class SessionState(
}
}
-
private[sql] object SessionState {
+ /**
+ * Create a new [[SessionState]] for the given session.
+ */
+ def apply(session: SparkSession): SessionState = {
+ new SessionStateBuilder(session).build
+ }
+
+ def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf):
Configuration = {
+ val newHadoopConf = new Configuration(hadoopConf)
+ sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null)
newHadoopConf.set(k, v) }
+ newHadoopConf
+ }
+}
+
+/**
+ * Builder class that coordinates construction of a new [[SessionState]].
+ *
+ * The builder explicitly defines all components needed by the session
state, and creates a session
+ * state when `build` is called. Components should only be initialized
once. This is not a problem
+ * for most components as they are only used in the `build` function.
However some components
+ * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` &
`sqlParser`) are as dependencies
+ * for other components and are shared as a result. These components are
defined as lazy vals to
+ * make sure the component is created only once.
+ *
+ * A developer can modify the builder by providing custom versions of
components, or by using the
+ * hooks provided for the analyzer, optimizer & planner. There are some
dependencies between the
+ * components (they are documented per dependency), a developer should
respect these when making
+ * modifications in order to prevent initialization problems.
+ *
+ * A parent [[SessionState]] can be used to initialize the new
[[SessionState]]. The new session
+ * state will clone the parent sessions state's `conf`,
`functionRegistry`, `experimentalMethods`
+ * and `catalog` fields. Note that the state is cloned when `build` is
called, and not before.
+ */
+@Experimental
[email protected]
+abstract class BaseSessionStateBuilder(
+ val session: SparkSession,
+ val parentState: Option[SessionState] = None) {
+ type NewBuilder = (SparkSession, Option[SessionState]) =>
BaseSessionStateBuilder
- def apply(sparkSession: SparkSession): SessionState = {
- apply(sparkSession, new SQLConf)
+ /**
+ * Extract entries from `SparkConf` and put them in the `SQLConf`
+ */
+ protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf):
Unit = {
+ sparkConf.getAll.foreach { case (k, v) =>
+ sqlConf.setConfString(k, v)
+ }
}
- def apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState = {
- val sparkContext = sparkSession.sparkContext
+ /**
+ * SQL-specific key-value configurations.
+ *
+ * These either get cloned from a pre-existing instance or newly
created. The conf is always
+ * merged with its [[SparkConf]].
+ */
+ protected lazy val conf: SQLConf = {
+ val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
+ mergeSparkConf(conf, session.sparkContext.conf)
+ conf
+ }
- // Automatically extract all entries and put them in our SQLConf
- mergeSparkConf(sqlConf, sparkContext.getConf)
+ /**
+ * Internal catalog managing functions registered by the user.
+ *
+ * This either gets cloned from a pre-existing version or cloned from
the build-in registry.
+ */
+ protected lazy val functionRegistry: FunctionRegistry = {
+
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
+ }
- val functionRegistry = FunctionRegistry.builtin.clone()
+ /**
+ * Experimental methods that can be used to define custom optimization
rules and custom planning
+ * strategies.
+ *
+ * This either gets cloned from a pre-existing version or newly created.
+ */
+ protected lazy val experimentalMethods: ExperimentalMethods = {
+ parentState.map(_.experimentalMethods.clone()).getOrElse(new
ExperimentalMethods)
+ }
- val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
+ /**
+ * Parser that extracts expressions, plans, table identifiers etc. from
SQL texts.
+ *
+ * Note: this depends on the `conf` field.
+ */
+ protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
+ /**
+ * Catalog for managing table and database states. If there is a
pre-existing catalog, the state
+ * of that catalog (temp tables & current database) will be copied into
the new catalog.
+ *
+ * Note: this depends on the `conf`, `functionRegistry` and `sqlParser`
fields.
+ */
+ protected lazy val catalog: SessionCatalog = {
val catalog = new SessionCatalog(
- sparkSession.sharedState.externalCatalog,
- sparkSession.sharedState.globalTempViewManager,
+ session.sharedState.externalCatalog,
+ session.sharedState.globalTempViewManager,
functionRegistry,
- sqlConf,
- newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
- sqlParser)
+ conf,
+ SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration,
conf),
+ sqlParser,
+ new SessionFunctionResourceLoader(session))
+ parentState.foreach(_.catalog.copyStateTo(catalog))
+ catalog
+ }
- val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf)
+ /**
+ * Logical query plan analyzer for resolving unresolved attributes and
relations.
+ *
+ * Note: this depends on the `conf` and `catalog` field.
--- End diff --
Yeah this also depends on the `SessionState` and its `sqlContext` wrapper.
The main goal of these notes were to document the dependencies within the
builder.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]