This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d27496eb3bf [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods d27496eb3bf is described below commit d27496eb3bf962981e37f989ba486d847745444f Author: Herman van Hovell <her...@databricks.com> AuthorDate: Thu Aug 10 09:49:45 2023 +0900 [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods ### What changes were proposed in this pull request? This PR adds a couple methods to SparkSession.Builder: - `conf` - this group of methods allows you to set runtime configurations on the Spark Connect Session. - `master` - this is a no-op, it is only added for compatibility. - `appName` - this is a no-op, it is only added for compatibility. - `enableHiveSupport ` - this is a no-op, it is only added for compatibility. ### Why are the changes needed? We want to maximize compatiblity with the existing API in sql/core. ### Does this PR introduce _any_ user-facing change? Yes. It adds a couple of builder methods. ### How was this patch tested? Add tests to `SparkSessionSuite` and `SparkSessionE2ESuite`. Closes #42419 from hvanhovell/SPARK-44747. Authored-by: Herman van Hovell <her...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../scala/org/apache/spark/sql/SparkSession.scala | 91 +++++++++++++++++++++- .../apache/spark/sql/SparkSessionE2ESuite.scala | 46 +++++++++++ .../org/apache/spark/sql/SparkSessionSuite.scala | 10 +++ .../CheckConnectJvmClientCompatibility.scala | 6 -- 4 files changed, 146 insertions(+), 7 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 7367ed153f7..e902e04e246 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -781,6 +781,7 @@ object SparkSession extends Logging { class Builder() extends Logging { private val builder = SparkConnectClient.builder() private var client: SparkConnectClient = _ + private[this] val options = new scala.collection.mutable.HashMap[String, String] def remote(connectionString: String): Builder = { builder.connectionString(connectionString) @@ -804,6 +805,84 @@ object SparkSession extends Logging { this } + /** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ + def config(key: String, value: String): Builder = synchronized { + options += key -> value + this + } + + /** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ + def config(key: String, value: Long): Builder = synchronized { + options += key -> value.toString + this + } + + /** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ + def config(key: String, value: Double): Builder = synchronized { + options += key -> value.toString + this + } + + /** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ + def config(key: String, value: Boolean): Builder = synchronized { + options += key -> value.toString + this + } + + /** + * Sets a config a map of options. Options set using this method are automatically propagated + * to the Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ + def config(map: Map[String, Any]): Builder = synchronized { + map.foreach { kv: (String, Any) => + { + options += kv._1 -> kv._2.toString + } + } + this + } + + /** + * Sets a config option. Options set using this method are automatically propagated to both + * `SparkConf` and SparkSession's own configuration. + * + * @since 3.5.0 + */ + def config(map: java.util.Map[String, Any]): Builder = synchronized { + config(map.asScala.toMap) + } + + @deprecated("enableHiveSupport does not work in Spark Connect") + def enableHiveSupport(): Builder = this + + @deprecated("master does not work in Spark Connect, please use remote instead") + def master(master: String): Builder = this + + @deprecated("appName does not work in Spark Connect") + def appName(name: String): Builder = this + private def tryCreateSessionFromClient(): Option[SparkSession] = { if (client != null) { Option(new SparkSession(client, cleaner, planIdGenerator)) @@ -812,6 +891,12 @@ object SparkSession extends Logging { } } + private def applyOptions(session: SparkSession): Unit = { + options.foreach { case (key, value) => + session.conf.set(key, value) + } + } + /** * Build the [[SparkSession]]. * @@ -833,6 +918,7 @@ object SparkSession extends Logging { val session = tryCreateSessionFromClient() .getOrElse(SparkSession.this.create(builder.configuration)) setDefaultAndActiveSession(session) + applyOptions(session) session } @@ -842,7 +928,9 @@ object SparkSession extends Logging { * If a session exist with the same configuration that is returned instead of creating a new * session. * - * This method will update the default and/or active session if they are not set. + * This method will update the default and/or active session if they are not set. This method + * will always set the specified configuration options on the session, even when it is not + * newly created. * * @since 3.5.0 */ @@ -850,6 +938,7 @@ object SparkSession extends Logging { val session = tryCreateSessionFromClient() .getOrElse(sessions.get(builder.configuration)) setDefaultAndActiveSession(session) + applyOptions(session) session } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala index 86deae982a5..490bdf9cd86 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala @@ -249,4 +249,50 @@ class SparkSessionE2ESuite extends RemoteSparkSession { } assert(e.getMessage contains "OPERATION_CANCELED") } + + test("option propagation") { + val remote = s"sc://localhost:$serverPort" + val session1 = SparkSession + .builder() + .remote(remote) + .config("foo", 12L) + .config("bar", value = true) + .config("bob", 12.0) + .config("heading", "north") + .getOrCreate() + assert(session1.conf.get("foo") == "12") + assert(session1.conf.get("bar") == "true") + assert(session1.conf.get("bob") == String.valueOf(12.0)) + assert(session1.conf.get("heading") == "north") + + // Check if new options are applied to an existing session. + val session2 = SparkSession + .builder() + .remote(remote) + .config("heading", "south") + .getOrCreate() + assert(session2 == session1) + assert(session2.conf.get("heading") == "south") + + // Create a completely different session, confs are not support to leak. + val session3 = SparkSession + .builder() + .remote(remote) + .config(Map("foo" -> "13", "baar" -> "false", "heading" -> "east")) + .create() + assert(session3 != session1) + assert(session3.conf.get("foo") == "13") + assert(session3.conf.get("baar") == "false") + assert(session3.conf.getOption("bob").isEmpty) + assert(session3.conf.get("heading") == "east") + + // Try to set a static conf. + intercept[Exception] { + SparkSession + .builder() + .remote(remote) + .config("spark.sql.globalTempDatabase", "not_gonna_happen") + .create() + } + } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala index 2d7ded2d688..4aa8b4360ee 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala @@ -251,4 +251,14 @@ class SparkSessionSuite extends ConnectFunSuite { executor.shutdown() } } + + test("deprecated methods") { + SparkSession + .builder() + .master("yayay") + .appName("bob") + .enableHiveSupport() + .create() + .close() + } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index 4439a5f3e2a..3fc02d7c397 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -223,14 +223,8 @@ object CheckConnectJvmClientCompatibility { "org.apache.spark.sql.SparkSession#implicits._sqlContext"), // SparkSession#Builder - ProblemFilters.exclude[DirectMissingMethodProblem]( - "org.apache.spark.sql.SparkSession#Builder.appName"), ProblemFilters.exclude[DirectMissingMethodProblem]( "org.apache.spark.sql.SparkSession#Builder.config"), - ProblemFilters.exclude[DirectMissingMethodProblem]( - "org.apache.spark.sql.SparkSession#Builder.master"), - ProblemFilters.exclude[DirectMissingMethodProblem]( - "org.apache.spark.sql.SparkSession#Builder.enableHiveSupport"), ProblemFilters.exclude[DirectMissingMethodProblem]( "org.apache.spark.sql.SparkSession#Builder.withExtensions"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org